zero323 commented on a change in pull request #34439: URL: https://github.com/apache/spark/pull/34439#discussion_r754106603
########## File path: python/pyspark/broadcast.py ########## @@ -62,35 +81,44 @@ class Broadcast(object): >>> large_broadcast = sc.broadcast(range(10000)) """ - def __init__(self, sc=None, value=None, pickle_registry=None, path=None, sock_file=None): + def __init__( + self, + sc: Optional["SparkContext"] = None, + value: Optional[T] = None, + pickle_registry: Optional["BroadcastPickleRegistry"] = None, + path: Optional[Any] = None, + sock_file: Optional[Any] = None, + ): """ Should not be called directly by users -- use :meth:`SparkContext.broadcast` instead. """ if sc is not None: # we're on the driver. We want the pickled data to end up in a file (maybe encrypted) - f = NamedTemporaryFile(delete=False, dir=sc._temp_dir) + f = NamedTemporaryFile(delete=False, dir=sc._temp_dir) # type: ignore[attr-defined] self._path = f.name - self._sc = sc - self._python_broadcast = sc._jvm.PythonRDD.setupBroadcast(self._path) - if sc._encryption_enabled: + self._sc: Optional["SparkContext"] = sc + self._python_broadcast = sc._jvm.PythonRDD.setupBroadcast(self._path) # type: ignore[attr-defined] + if sc._encryption_enabled: # type: ignore[attr-defined] # with encryption, we ask the jvm to do the encryption for us, we send it data # over a socket port, auth_secret = self._python_broadcast.setupEncryptionServer() (encryption_sock_file, _) = local_connect_and_auth(port, auth_secret) - broadcast_out = ChunkedStream(encryption_sock_file, 8192) + broadcast_out: Union[ChunkedStream, IO[bytes]] = ChunkedStream( + encryption_sock_file, 8192 + ) else: # no encryption, we can just write pickled data directly to the file from python broadcast_out = f - self.dump(value, broadcast_out) - if sc._encryption_enabled: + self.dump(cast(T, value), broadcast_out) Review comment: @HyukjinKwon @itholic @ueshin @xinrong-databricks WDYT? Do we need a fancy overload on `__int__` here? Something around these lines ```python @overload # On driver def __init__(self: Broadcast[T], sc: SparkContext, value: T pickle_registry: BroadcastPickleRegistry): ... @overload # On worker without decryption server def __init__(self: Broadcast[Any], *, path: str): ... # This is a placeholder for arbitrary value, so not Broadcast[None] @overload # On worker with decryption server def __init__(self: Broadcast[Any], *, sock_file: str): ... # Ditto ``` `cast` definitely seems wrong, because we know that this thing can be `None` in this control flow (this is in contrast to many optional fields we access and we know, that under normal operating conditions, are not null). If anything, it should be ignored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org