[ https://issues.apache.org/jira/browse/BEAM-8823?focusedWorklogId=662965&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-662965 ]
ASF GitHub Bot logged work on BEAM-8823: ---------------------------------------- Author: ASF GitHub Bot Created on: 08/Oct/21 20:38 Start Date: 08/Oct/21 20:38 Worklog Time Spent: 10m Work Description: y1chi commented on a change in pull request #15441: URL: https://github.com/apache/beam/pull/15441#discussion_r725287970 ########## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py ########## @@ -584,100 +679,164 @@ def _add_residuals_and_channel_splits_to_deferred_inputs( channel_split.transform_id] = channel_split.last_primary_element return pcolls_with_delayed_apps, transforms_with_channel_splits - def _run_stage(self, + def _execute_bundle(self, runner_execution_context, # type: execution.FnApiRunnerExecutionContext bundle_context_manager, # type: execution.BundleContextManager - ): - # type: (...) -> beam_fn_api_pb2.InstructionResponse - - """Run an individual stage. + bundle_input: DataInput + ) -> beam_fn_api_pb2.InstructionResponse: + """Execute a bundle end-to-end. Args: runner_execution_context (execution.FnApiRunnerExecutionContext): An object containing execution information for the pipeline. bundle_context_manager (execution.BundleContextManager): A description of the stage to execute, and its context. + bundle_input: The set of buffers to input into this bundle """ - data_input, data_output, expected_timer_output = ( - bundle_context_manager.extract_bundle_inputs_and_outputs()) - input_timers = { - } # type: Mapping[Tuple[str, str], execution.PartitionableBuffer] - worker_handler_manager = runner_execution_context.worker_handler_manager - _LOGGER.info('Running %s', bundle_context_manager.stage.name) + + # TODO(pabloem): Should move this to be done once per stage worker_handler_manager.register_process_bundle_descriptor( bundle_context_manager.process_bundle_descriptor) - # We create the bundle manager here, as it can be reused for bundles of the - # same stage, but it may have to be created by-bundle later on. + # We create the bundle manager here, as it can be reused for bundles of + # the same stage, but it may have to be created by-bundle later on. + bundle_manager = self._get_bundle_manager(bundle_context_manager) + + last_result, deferred_inputs, newly_set_timers, watermark_updates = ( + self._run_bundle( + runner_execution_context, + bundle_context_manager, + bundle_input, + bundle_context_manager.stage_data_outputs, + bundle_context_manager.stage_timer_outputs, + bundle_manager)) + + for pc_name, watermark in watermark_updates.items(): + runner_execution_context.watermark_manager.set_pcoll_watermark( + pc_name, watermark) + + if deferred_inputs: + assert (runner_execution_context.watermark_manager.get_stage_node( + bundle_context_manager.stage.name).output_watermark() + < timestamp.MAX_TIMESTAMP), ( + 'wrong timestamp for %s. ' + % runner_execution_context.watermark_manager.get_stage_node( + bundle_context_manager.stage.name)) + runner_execution_context.queues.ready_inputs.enque( + (bundle_context_manager.stage.name, DataInput(deferred_inputs, {}))) + + self._enqueue_set_timers( + runner_execution_context, + bundle_context_manager, + newly_set_timers, + bundle_input) + + # Store the required downstream side inputs into state so it is accessible + # for the worker when it runs bundles that consume this stage's output. + data_side_input = ( + runner_execution_context.side_input_descriptors_by_stage.get( + bundle_context_manager.stage.name, {})) + runner_execution_context.commit_side_inputs_to_state(data_side_input) + + buffers_to_clean = set() + known_consumers = set() + for _, buffer_id in bundle_context_manager.stage_data_outputs.items(): + for (consuming_stage_name, consuming_transform) in \ + runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id, + []): + buffer = runner_execution_context.pcoll_buffers.get( + buffer_id, ListBuffer(None)) + + if (buffer_id in runner_execution_context.pcoll_buffers and + buffer_id not in buffers_to_clean): + buffers_to_clean.add(buffer_id) + elif buffer and buffer_id in buffers_to_clean: + # If the buffer_id has already been added to buffers_to_clean, this + # means that the buffer is being consumed by two separate stages, + # so we create a copy of the buffer for every new stage. + runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy() + buffer = runner_execution_context.pcoll_buffers[buffer_id] Review comment: ah ok, I guess what confused me is why we need to write the copy back to pcoll_buffers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 662965) Time Spent: 7h 50m (was: 7h 40m) > Make FnApiRunner work by executing ready elements instead of stages > ------------------------------------------------------------------- > > Key: BEAM-8823 > URL: https://issues.apache.org/jira/browse/BEAM-8823 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core > Reporter: Pablo Estrada > Priority: P3 > Time Spent: 7h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)