boyuanzz commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r449736466



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -681,7 +681,21 @@ def input_for(self, transform_id, input_id):
     input_pcoll = self.process_bundle_descriptor.transforms[
         transform_id].inputs[input_id]
     for read_id, proto in self.process_bundle_descriptor.transforms.items():
+      # The GrpcRead is followed by the SDF/Process.
       if (proto.spec.urn == bundle_processor.DATA_INPUT_URN and
           input_pcoll in proto.outputs.values()):
         return read_id
+      # The GrpcRead is follwoed by the SDF/Truncate -> SDF/Process.

Review comment:
       It's the implementation details of how `FnApiRunner` re-schedules the 
delayed work. The original assumption here is that any operation that can 
produce delayed work(`timer`, `DelayedBundleApplication`) must consume inputs 
from `GrpcRead`. So when the `FnApiRunner` gets the `DelayedBundleApplication` 
from `SDF/Process`, it must find a corresponding `GrpcRead` transform id, which 
should be the input node of `SDF/Truncate` when there is a truncate.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to