This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 857d9f9 [SPARK-37033][PYTHON] Inline type hints for python/pyspark/resource/requests.py 857d9f9 is described below commit 857d9f93a889f35220011f2e4a45aaf747c1e894 Author: dchvn <dgd_contribu...@viettel.com.vn> AuthorDate: Thu Oct 21 10:13:32 2021 +0900 [SPARK-37033][PYTHON] Inline type hints for python/pyspark/resource/requests.py ### What changes were proposed in this pull request? Inline type hints for python/pyspark/resource/requests.py ### Why are the changes needed? Currently, there is type hint stub files python/pyspark/streaming/context.pyi to show the expected types for functions, but we can also take advantage of static type checking within the functions by inlining the type hints. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #34321 from dchvn/SPARK-37033. Authored-by: dchvn <dgd_contribu...@viettel.com.vn> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/resource/requests.py | 111 ++++++++++++++++++++++++++--------- python/pyspark/resource/requests.pyi | 83 -------------------------- 2 files changed, 83 insertions(+), 111 deletions(-) diff --git a/python/pyspark/resource/requests.py b/python/pyspark/resource/requests.py index 4deb22b..169ee1f 100644 --- a/python/pyspark/resource/requests.py +++ b/python/pyspark/resource/requests.py @@ -14,8 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from typing import overload, Optional, Dict -from pyspark.util import _parse_memory +from py4j.java_gateway import JavaObject, JVMView + +from pyspark.util import _parse_memory # type: ignore[attr-defined] class ExecutorResourceRequest(object): @@ -61,26 +64,33 @@ class ExecutorResourceRequest(object): ----- This API is evolving. """ - def __init__(self, resourceName, amount, discoveryScript="", vendor=""): + + def __init__( + self, + resourceName: str, + amount: int, + discoveryScript: str = "", + vendor: str = "", + ): self._name = resourceName self._amount = amount self._discovery_script = discoveryScript self._vendor = vendor @property - def resourceName(self): + def resourceName(self) -> str: return self._name @property - def amount(self): + def amount(self) -> int: return self._amount @property - def discoveryScript(self): + def discoveryScript(self) -> str: return self._discovery_script @property - def vendor(self): + def vendor(self) -> str: return self._vendor @@ -103,9 +113,25 @@ class ExecutorResourceRequests(object): _PYSPARK_MEM = "pyspark.memory" _OFFHEAP_MEM = "offHeap" - def __init__(self, _jvm=None, _requests=None): + @overload + def __init__(self, _jvm: JVMView): + ... + + @overload + def __init__( + self, + _jvm: None = ..., + _requests: Optional[Dict[str, ExecutorResourceRequest]] = ..., + ): + ... + + def __init__( + self, + _jvm: Optional[JVMView] = None, + _requests: Optional[Dict[str, ExecutorResourceRequest]] = None, + ): from pyspark import SparkContext - _jvm = _jvm or SparkContext._jvm + _jvm = _jvm or SparkContext._jvm # type: ignore[attr-defined] if _jvm is not None: self._java_executor_resource_requests = \ _jvm.org.apache.spark.resource.ExecutorResourceRequests() @@ -124,9 +150,9 @@ class ExecutorResourceRequests(object): v.discoveryScript, v.vendor) else: self._java_executor_resource_requests = None - self._executor_resources = {} + self._executor_resources: Dict[str, ExecutorResourceRequest] = {} - def memory(self, amount): + def memory(self, amount: str) -> "ExecutorResourceRequests": if self._java_executor_resource_requests is not None: self._java_executor_resource_requests.memory(amount) else: @@ -134,7 +160,7 @@ class ExecutorResourceRequests(object): _parse_memory(amount)) return self - def memoryOverhead(self, amount): + def memoryOverhead(self, amount: str) -> "ExecutorResourceRequests": if self._java_executor_resource_requests is not None: self._java_executor_resource_requests.memoryOverhead(amount) else: @@ -142,7 +168,7 @@ class ExecutorResourceRequests(object): ExecutorResourceRequest(self._OVERHEAD_MEM, _parse_memory(amount)) return self - def pysparkMemory(self, amount): + def pysparkMemory(self, amount: str) -> "ExecutorResourceRequests": if self._java_executor_resource_requests is not None: self._java_executor_resource_requests.pysparkMemory(amount) else: @@ -150,7 +176,7 @@ class ExecutorResourceRequests(object): ExecutorResourceRequest(self._PYSPARK_MEM, _parse_memory(amount)) return self - def offheapMemory(self, amount): + def offheapMemory(self, amount: str) -> "ExecutorResourceRequests": if self._java_executor_resource_requests is not None: self._java_executor_resource_requests.offHeapMemory(amount) else: @@ -158,14 +184,20 @@ class ExecutorResourceRequests(object): ExecutorResourceRequest(self._OFFHEAP_MEM, _parse_memory(amount)) return self - def cores(self, amount): + def cores(self, amount: int) -> "ExecutorResourceRequests": if self._java_executor_resource_requests is not None: self._java_executor_resource_requests.cores(amount) else: self._executor_resources[self._CORES] = ExecutorResourceRequest(self._CORES, amount) return self - def resource(self, resourceName, amount, discoveryScript="", vendor=""): + def resource( + self, + resourceName: str, + amount: int, + discoveryScript: str = "", + vendor: str = "", + ) -> "ExecutorResourceRequests": if self._java_executor_resource_requests is not None: self._java_executor_resource_requests.resource(resourceName, amount, discoveryScript, vendor) @@ -175,7 +207,7 @@ class ExecutorResourceRequests(object): return self @property - def requests(self): + def requests(self) -> Dict[str, ExecutorResourceRequest]: if self._java_executor_resource_requests is not None: result = {} execRes = self._java_executor_resource_requests.requestsJMap() @@ -210,16 +242,16 @@ class TaskResourceRequest(object): ----- This API is evolving. """ - def __init__(self, resourceName, amount): + def __init__(self, resourceName: str, amount: float): self._name = resourceName self._amount = float(amount) @property - def resourceName(self): + def resourceName(self) -> str: return self._name @property - def amount(self): + def amount(self) -> float: return self._amount @@ -239,12 +271,30 @@ class TaskResourceRequests(object): _CPUS = "cpus" - def __init__(self, _jvm=None, _requests=None): + @overload + def __init__(self, _jvm: JVMView): + ... + + @overload + def __init__( + self, + _jvm: None = ..., + _requests: Optional[Dict[str, TaskResourceRequest]] = ..., + ): + ... + + def __init__( + self, + _jvm: Optional[JVMView] = None, + _requests: Optional[Dict[str, TaskResourceRequest]] = None, + ): from pyspark import SparkContext - _jvm = _jvm or SparkContext._jvm + _jvm = _jvm or SparkContext._jvm # type: ignore[attr-defined] if _jvm is not None: - self._java_task_resource_requests = \ - SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests() + self._java_task_resource_requests: Optional[JavaObject] = ( + SparkContext._jvm.org # type: ignore[attr-defined] + .apache.spark.resource.TaskResourceRequests() + ) if _requests is not None: for k, v in _requests.items(): if k == self._CPUS: @@ -253,24 +303,29 @@ class TaskResourceRequests(object): self._java_task_resource_requests.resource(v.resourceName, v.amount) else: self._java_task_resource_requests = None - self._task_resources = {} + self._task_resources: Dict[str, TaskResourceRequest] = {} - def cpus(self, amount): + def cpus(self, amount: int) -> "TaskResourceRequests": if self._java_task_resource_requests is not None: self._java_task_resource_requests.cpus(amount) else: self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount) return self - def resource(self, resourceName, amount): + def resource( + self, + resourceName: str, + amount: float + ) -> "TaskResourceRequests": if self._java_task_resource_requests is not None: - self._java_task_resource_requests.resource(resourceName, float(amount)) + self._java_task_resource_requests.resource( + resourceName, float(amount)) else: self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount) return self @property - def requests(self): + def requests(self) -> Dict[str, TaskResourceRequest]: if self._java_task_resource_requests is not None: result = {} taskRes = self._java_task_resource_requests.requestsJMap() diff --git a/python/pyspark/resource/requests.pyi b/python/pyspark/resource/requests.pyi deleted file mode 100644 index 6ba14d6..0000000 --- a/python/pyspark/resource/requests.pyi +++ /dev/null @@ -1,83 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from typing import overload, Dict, Optional - -from py4j.java_gateway import JVMView # type: ignore[import] - -class ExecutorResourceRequest: - def __init__( - self, - resourceName: str, - amount: int, - discoveryScript: str = ..., - vendor: str = ..., - ) -> None: ... - @property - def resourceName(self) -> str: ... - @property - def amount(self) -> int: ... - @property - def discoveryScript(self) -> str: ... - @property - def vendor(self) -> str: ... - -class ExecutorResourceRequests: - @overload - def __init__(self, _jvm: JVMView) -> None: ... - @overload - def __init__( - self, - _jvm: None = ..., - _requests: Optional[Dict[str, ExecutorResourceRequest]] = ..., - ) -> None: ... - def memory(self, amount: str) -> ExecutorResourceRequests: ... - def memoryOverhead(self, amount: str) -> ExecutorResourceRequests: ... - def pysparkMemory(self, amount: str) -> ExecutorResourceRequests: ... - def offheapMemory(self, amount: str) -> ExecutorResourceRequests: ... - def cores(self, amount: int) -> ExecutorResourceRequests: ... - def resource( - self, - resourceName: str, - amount: int, - discoveryScript: str = ..., - vendor: str = ..., - ) -> ExecutorResourceRequests: ... - @property - def requests(self) -> Dict[str, ExecutorResourceRequest]: ... - -class TaskResourceRequest: - def __init__(self, resourceName: str, amount: float) -> None: ... - @property - def resourceName(self) -> str: ... - @property - def amount(self) -> float: ... - -class TaskResourceRequests: - @overload - def __init__(self, _jvm: JVMView) -> None: ... - @overload - def __init__( - self, - _jvm: None = ..., - _requests: Optional[Dict[str, TaskResourceRequest]] = ..., - ) -> None: ... - def cpus(self, amount: int) -> TaskResourceRequests: ... - def resource(self, resourceName: str, amount: float) -> TaskResourceRequests: ... - @property - def requests(self) -> Dict[str, TaskResourceRequest]: ... --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org