zero323 commented on a change in pull request #34439:
URL: https://github.com/apache/spark/pull/34439#discussion_r754106603



##########
File path: python/pyspark/broadcast.py
##########
@@ -62,35 +81,44 @@ class Broadcast(object):
     >>> large_broadcast = sc.broadcast(range(10000))
     """
 
-    def __init__(self, sc=None, value=None, pickle_registry=None, path=None, 
sock_file=None):
+    def __init__(
+        self,
+        sc: Optional["SparkContext"] = None,
+        value: Optional[T] = None,
+        pickle_registry: Optional["BroadcastPickleRegistry"] = None,
+        path: Optional[Any] = None,
+        sock_file: Optional[Any] = None,
+    ):
         """
         Should not be called directly by users -- use 
:meth:`SparkContext.broadcast`
         instead.
         """
         if sc is not None:
             # we're on the driver.  We want the pickled data to end up in a 
file (maybe encrypted)
-            f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)
+            f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)  # type: 
ignore[attr-defined]
             self._path = f.name
-            self._sc = sc
-            self._python_broadcast = 
sc._jvm.PythonRDD.setupBroadcast(self._path)
-            if sc._encryption_enabled:
+            self._sc: Optional["SparkContext"] = sc
+            self._python_broadcast = 
sc._jvm.PythonRDD.setupBroadcast(self._path)  # type: ignore[attr-defined]
+            if sc._encryption_enabled:  # type: ignore[attr-defined]
                 # with encryption, we ask the jvm to do the encryption for us, 
we send it data
                 # over a socket
                 port, auth_secret = 
self._python_broadcast.setupEncryptionServer()
                 (encryption_sock_file, _) = local_connect_and_auth(port, 
auth_secret)
-                broadcast_out = ChunkedStream(encryption_sock_file, 8192)
+                broadcast_out: Union[ChunkedStream, IO[bytes]] = ChunkedStream(
+                    encryption_sock_file, 8192
+                )
             else:
                 # no encryption, we can just write pickled data directly to 
the file from python
                 broadcast_out = f
-            self.dump(value, broadcast_out)
-            if sc._encryption_enabled:
+            self.dump(cast(T, value), broadcast_out)

Review comment:
       @HyukjinKwon @itholic  @ueshin  @xinrong-databricks  WDYT? 
   
   Do we need a fancy overload on `__int__` here? Something around these lines
   
   ```python
       @overload  # On driver
       def __init__(self: Broadcast[T], sc: SparkContext, value: T 
pickle_registry: BroadcastPickleRegistry): ...
       @overload  # On worker without decryption server
       def __init__(self: Broadcast[Any], *, path: str): ...       # This is a 
placeholder for arbitrary value, so not Broadcast[None] 
       @overload  # On worker with  decryption server
       def __init__(self: Broadcast[Any], *, sock_file: str): ...  # Ditto
   ```
   
   `cast` definitely seems wrong, because we know that this thing can be `None` 
in this control flow (this is in contrast to many optional fields we access and 
we know, that under normal operating conditions, are not null). If anything, it 
should be ignored.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to