lukecwik commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r448663612
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1079,6 +1079,13 @@ def construct_bundle_application(self,
):
# type: (...) -> beam_fn_api_pb2.BundleApplication
transform_id, main_input_tag, main_input_coder, outputs = op.input_info
+ # The main_input_coder should be the main_input_coder of
+ # SdfTruncateSizedRestrictions if SdfProcessSizedElements is following
+ # SdfTruncateSizedRestrictions.
+ if (isinstance(op, operations.SdfProcessSizedElements) and
Review comment:
I don't think this is necessary.
The bundle application should have splits/residuals for the
SdfProcessSizedElements and should use its input info and not the input info of
the sdf_truncate_op. The runner can choose to feed the split back through the
truncate operation or it can choose to feed the split directly to the
SdfProcessSizedElements based upon how it chooses to construct the graph.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]