[ https://issues.apache.org/jira/browse/BEAM-3645?focusedWorklogId=279661&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-279661 ]
ASF GitHub Bot logged work on BEAM-3645: ---------------------------------------- Author: ASF GitHub Bot Created on: 19/Jul/19 11:12 Start Date: 19/Jul/19 11:12 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #8979: [BEAM-3645] add multiplexing for python FnApiRunner URL: https://github.com/apache/beam/pull/8979#discussion_r305303487 ########## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ########## @@ -92,34 +92,78 @@ class BeamFnControlServicer(beam_fn_api_pb2_grpc.BeamFnControlServicer): _DONE_MARKER = object() def __init__(self): - self._push_queue = queue.Queue() self._futures_by_id = dict() - self._read_thread = threading.Thread( - name='beam_control_read', target=self._read) + self._read_thread = None self._uid_counter = 0 self._state = self.UNSTARTED_STATE self._lock = threading.Lock() + self._multi_worker_worker_handler = None + self._req_worker_mapping = {} + self._req_sent = collections.defaultdict(int) + self._req_received = collections.defaultdict(int) + self._logging_level = logging.getLevelName( + logging.getLogger().getEffectiveLevel()) def Control(self, iterator, context): with self._lock: if self._state == self.DONE_STATE: return else: self._state = self.STARTED_STATE - self._inputs = iterator + + metadata = dict((k, v) for k, v in context.invocation_metadata()) + worker_id = metadata.get('worker_id') + if not worker_id: + raise RuntimeError('All workers communicate through gRPC should have ' + 'worker_id. Received None.') + + # wait until worker_handlers are added to MultiWorkerWorkerHandler + while not self._multi_worker_worker_handler: Review comment: So generally the pattern of sleeping until an external actor sets a private variable is error prone. An alternative would be to have an object that represents the connection to a particular worker that could be instantiated by either side. In other words, currently this class represents both the service and the (singleton) connection, and breaking that apart could simplify things to establish the one-to-many relationship. This class would have a get_connection_handle(worker_id) method that would return a (cached by worker id) CollectionHandle class. This class would encapsulate the queue of items to send (populated by a push method that constructs and returns futures, no need to pass worker_id because it's implicit), as well as the reader thread (started when the connection comes in). In fact, most of the logic of this class would go there--BeamFnControlServicer would turn into something that simply caches and populates these when connections come in. A similar pattern would simplify the data channels. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 279661) Time Spent: 25h 10m (was: 25h) > Support multi-process execution on the FnApiRunner > -------------------------------------------------- > > Key: BEAM-3645 > URL: https://issues.apache.org/jira/browse/BEAM-3645 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core > Affects Versions: 2.2.0, 2.3.0 > Reporter: Charles Chen > Assignee: Hannah Jiang > Priority: Major > Fix For: 2.15.0 > > Time Spent: 25h 10m > Remaining Estimate: 0h > > https://issues.apache.org/jira/browse/BEAM-3644 gave us a 15x performance > gain over the previous DirectRunner. We can do even better in multi-core > environments by supporting multi-process execution in the FnApiRunner, to > scale past Python GIL limitations. -- This message was sent by Atlassian JIRA (v7.6.14#76016)