boyuanzz commented on a change in pull request #12016: URL: https://github.com/apache/beam/pull/12016#discussion_r449736466
########## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ########## @@ -681,7 +681,21 @@ def input_for(self, transform_id, input_id): input_pcoll = self.process_bundle_descriptor.transforms[ transform_id].inputs[input_id] for read_id, proto in self.process_bundle_descriptor.transforms.items(): + # The GrpcRead is followed by the SDF/Process. if (proto.spec.urn == bundle_processor.DATA_INPUT_URN and input_pcoll in proto.outputs.values()): return read_id + # The GrpcRead is follwoed by the SDF/Truncate -> SDF/Process. Review comment: It's the implementation details of how `FnApiRunner` re-schedules the delayed work. The original assumption here is that any operation that can produce delayed work(`timer`, `DelayedBundleApplication`) must consume inputs from `GrpcRead`. So when the `FnApiRunner` gets the `DelayedBundleApplication` from `SDF/Process`, it must find a corresponding `GrpcRead` transform id, which should be the input node of `SDF/Truncate` when there is a truncate. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org