damccorm commented on code in PR #37113: URL: https://github.com/apache/beam/pull/37113#discussion_r2687842957
########## sdks/python/apache_beam/ml/inference/model_manager.py: ########## @@ -0,0 +1,669 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): + self._current_usage = 0.0 + self._peak_usage = 0.0 + self._total_memory = fallback_memory_mb + self._poll_interval = poll_interval + self._peak_window_seconds = peak_window_seconds + self._memory_history = deque() + self._running = False + self._thread = None + self._lock = threading.Lock() + self._gpu_available = self._detect_hardware() Review Comment: I think it would be better to defer this logic to the first time we call `start`. That will ensure that if this object gets created at job submission, we still look in the execution environment for the GPU ########## sdks/python/apache_beam/ml/inference/model_manager.py: ########## @@ -0,0 +1,669 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): + self._current_usage = 0.0 + self._peak_usage = 0.0 + self._total_memory = fallback_memory_mb + self._poll_interval = poll_interval + self._peak_window_seconds = peak_window_seconds + self._memory_history = deque() + self._running = False + self._thread = None + self._lock = threading.Lock() + self._gpu_available = self._detect_hardware() + + def _detect_hardware(self): + try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True + except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False + except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): + if self._running or not self._gpu_available: + return + self._running = True + self._thread = threading.Thread(target=self._poll_loop, daemon=True) + self._thread.start() + + def stop(self): + self._running = False + if self._thread: + self._thread.join() + + def reset_peak(self): + with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: + with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): + """Forces an immediate poll of the GPU.""" + usage = self._get_nvidia_smi_used() + now = time.time() + with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): + self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: + try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode("utf-8").strip() + free_memory = float(output) + return self._total_memory - free_memory + except Exception: + return 0.0 + + def _poll_loop(self): + while self._running: + usage = self._get_nvidia_smi_used() + now = time.time() + with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): + self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + time.sleep(self._poll_interval) + + +class ResourceEstimator: + def __init__(self, smoothing_factor: float = 0.2, min_data_points: int = 5): + self.smoothing_factor = smoothing_factor + self.min_data_points = min_data_points + self.estimates: Dict[str, float] = {} + self.history = defaultdict(lambda: deque(maxlen=20)) + self.known_models = set() + self._lock = threading.Lock() + + def is_unknown(self, model_tag: str) -> bool: + with self._lock: + return model_tag not in self.estimates + + def get_estimate(self, model_tag: str, default_mb: float = 4000.0) -> float: + with self._lock: + return self.estimates.get(model_tag, default_mb) + + def set_initial_estimate(self, model_tag: str, cost: float): + with self._lock: + self.estimates[model_tag] = cost + self.known_models.add(model_tag) + logger.info("Initial Profile for %s: %s MB", model_tag, cost) + + def add_observation( + self, active_snapshot: Dict[str, int], peak_memory: float): Review Comment: I'm having a little bit of a hard time following this. Is `active_snapshot` a map of model tags to the number of models loaded for that tag? ########## sdks/python/apache_beam/ml/inference/model_manager.py: ########## @@ -0,0 +1,669 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): + self._current_usage = 0.0 + self._peak_usage = 0.0 + self._total_memory = fallback_memory_mb + self._poll_interval = poll_interval + self._peak_window_seconds = peak_window_seconds + self._memory_history = deque() + self._running = False + self._thread = None + self._lock = threading.Lock() + self._gpu_available = self._detect_hardware() + + def _detect_hardware(self): + try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True + except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False + except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): + if self._running or not self._gpu_available: + return + self._running = True + self._thread = threading.Thread(target=self._poll_loop, daemon=True) + self._thread.start() + + def stop(self): + self._running = False + if self._thread: + self._thread.join() + + def reset_peak(self): + with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: + with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): + """Forces an immediate poll of the GPU.""" + usage = self._get_nvidia_smi_used() + now = time.time() + with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): + self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: + try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode("utf-8").strip() + free_memory = float(output) + return self._total_memory - free_memory + except Exception: + return 0.0 + + def _poll_loop(self): + while self._running: + usage = self._get_nvidia_smi_used() + now = time.time() + with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): + self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + time.sleep(self._poll_interval) + + +class ResourceEstimator: Review Comment: Could you add a pydoc explaining this class (here and elsewhere)? ########## sdks/python/apache_beam/ml/inference/model_manager.py: ########## @@ -0,0 +1,669 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Module for managing ML models in Apache Beam pipelines. + +This module provides classes and functions to efficiently manage multiple +machine learning models within Apache Beam pipelines. It includes functionality +for loading, caching, and updating models using multi-process shared memory, +ensuring that models are reused across different workers to optimize resource +usage and performance. +""" + +import uuid +import time +import threading +import subprocess +import logging +import gc +import numpy as np +from scipy.optimize import nnls +import torch +import heapq +import itertools +from collections import defaultdict, deque, Counter, OrderedDict +from typing import Dict, Any, Tuple, Optional, Callable + +logger = logging.getLogger(__name__) + + +class GPUMonitor: + def __init__( + self, + fallback_memory_mb: float = 16000.0, + poll_interval: float = 0.5, + peak_window_seconds: float = 30.0): + self._current_usage = 0.0 + self._peak_usage = 0.0 + self._total_memory = fallback_memory_mb + self._poll_interval = poll_interval + self._peak_window_seconds = peak_window_seconds + self._memory_history = deque() + self._running = False + self._thread = None + self._lock = threading.Lock() + self._gpu_available = self._detect_hardware() + + def _detect_hardware(self): + try: + cmd = [ + "nvidia-smi", + "--query-gpu=memory.total", + "--format=csv,noheader,nounits" + ] + output = subprocess.check_output(cmd, text=True).strip() + self._total_memory = float(output) + return True + except (FileNotFoundError, subprocess.CalledProcessError): + logger.warning( + "nvidia-smi not found or failed. Defaulting total memory to %s MB", + self._total_memory) + return False + except Exception as e: + logger.warning( + "Error parsing nvidia-smi output: %s. " + "Defaulting total memory to %s MB", + e, + self._total_memory) + return False + + def start(self): + if self._running or not self._gpu_available: + return + self._running = True + self._thread = threading.Thread(target=self._poll_loop, daemon=True) + self._thread.start() + + def stop(self): + self._running = False + if self._thread: + self._thread.join() + + def reset_peak(self): + with self._lock: + now = time.time() + self._memory_history.clear() + self._memory_history.append((now, self._current_usage)) + self._peak_usage = self._current_usage + + def get_stats(self) -> Tuple[float, float, float]: + with self._lock: + return self._current_usage, self._peak_usage, self._total_memory + + def refresh(self): + """Forces an immediate poll of the GPU.""" + usage = self._get_nvidia_smi_used() + now = time.time() + with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + # Recalculate peak immediately + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): + self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + + def _get_nvidia_smi_used(self) -> float: + try: + cmd = "nvidia-smi --query-gpu=memory.free --format=csv,noheader,nounits" + output = subprocess.check_output(cmd, shell=True).decode("utf-8").strip() + free_memory = float(output) + return self._total_memory - free_memory + except Exception: + return 0.0 + + def _poll_loop(self): + while self._running: + usage = self._get_nvidia_smi_used() + now = time.time() + with self._lock: + self._current_usage = usage + self._memory_history.append((now, usage)) + while self._memory_history and (now - self._memory_history[0][0] + > self._peak_window_seconds): + self._memory_history.popleft() + self._peak_usage = ( + max(m for _, m in self._memory_history) + if self._memory_history else usage) + time.sleep(self._poll_interval) + + +class ResourceEstimator: Review Comment: This would help me review and will help future readers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
