chadrik commented on a change in pull request #13060:
URL: https://github.com/apache/beam/pull/13060#discussion_r505933134



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
##########
@@ -1062,34 +1104,41 @@ def close(self):
 
 
 class ControlFuture(object):
-  def __init__(self, instruction_id, response=None):
+  def __init__(self,
+               instruction_id,  # type: str
+               response=None  # type: 
Optional[beam_fn_api_pb2.InstructionResponse]
+              ):
+    # type: (...) -> None
     self.instruction_id = instruction_id
-    if response:
-      self._response = response
-    else:
-      self._response = None
+    self._response = response
+    if response is None:
       self._condition = threading.Condition()
-    self._exception = None
+    self._exception = None  # type: Optional[Exception]
 
   def is_done(self):
+    # type: () -> bool
     return self._response is not None
 
   def set(self, response):
+    # type: (beam_fn_api_pb2.InstructionResponse) -> None
     with self._condition:
       self._response = response
       self._condition.notify_all()
 
   def get(self, timeout=None):
+    # type: (Optional[float]) -> beam_fn_api_pb2.InstructionResponse
     if not self._response and not self._exception:
       with self._condition:
         if not self._response and not self._exception:
           self._condition.wait(timeout)
     if self._exception:
       raise self._exception
     else:
+      assert self._response is not None

Review comment:
       It would be good to get confirmation that this assert is correct.  
AFAICT this method should never return None.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to