lukecwik commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r448655800
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
##########
@@ -1453,14 +1447,18 @@ def split_manager(num_elements):
yield 0
breakpoint.clear()
- # Everything should be perfectly split.
+ # Everything should be perfectly split.
Review comment:
```suggestion
# Everything should be perfectly split.
```
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -681,7 +681,21 @@ def input_for(self, transform_id, input_id):
input_pcoll = self.process_bundle_descriptor.transforms[
transform_id].inputs[input_id]
for read_id, proto in self.process_bundle_descriptor.transforms.items():
+ # The GrpcRead is followed by the SDF/Process.
if (proto.spec.urn == bundle_processor.DATA_INPUT_URN and
input_pcoll in proto.outputs.values()):
return read_id
+ # The GrpcRead is follwoed by the SDF/Truncate -> SDF/Process.
Review comment:
```suggestion
# The GrpcRead is followed by the SDF/Truncate -> SDF/Process.
```
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1079,6 +1079,13 @@ def construct_bundle_application(self,
):
# type: (...) -> beam_fn_api_pb2.BundleApplication
transform_id, main_input_tag, main_input_coder, outputs = op.input_info
+ # The main_input_coder should be the main_input_coder of
+ # SdfTruncateSizedRestrictions if SdfProcessSizedElements is following
+ # SdfTruncateSizedRestrictions.
+ if (isinstance(op, operations.SdfProcessSizedElements) and
Review comment:
I don't think this is necessary.
The bundle application should have splits/residuals for the
SdfProcessSizedElements and should use its input coder and not the input coder
of the sdf_truncate_op. The runner can choose to feed the split back through
the truncate operation or it can choose to feed the split directly to the
SdfProcessSizedElements based upon how it chooses to construct the graph.
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
##########
@@ -681,7 +681,21 @@ def input_for(self, transform_id, input_id):
input_pcoll = self.process_bundle_descriptor.transforms[
transform_id].inputs[input_id]
for read_id, proto in self.process_bundle_descriptor.transforms.items():
+ # The GrpcRead is followed by the SDF/Process.
if (proto.spec.urn == bundle_processor.DATA_INPUT_URN and
input_pcoll in proto.outputs.values()):
return read_id
+ # The GrpcRead is follwoed by the SDF/Truncate -> SDF/Process.
Review comment:
Why did this need to change?
##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -743,11 +743,36 @@ def pcollection_count_monitoring_infos(self,
tag_to_pcollection_id):
return infos
+class SdfTruncateSizedRestrictions(DoOperation):
+ def __init__(self, *args, **kwargs):
+ super(SdfTruncateSizedRestrictions, self).__init__(*args, **kwargs)
+ self.sdf_process_op = None
+
+ def current_element_progress(self):
+ # type: () -> Optional[iobase.RestrictionProgress]
+ return self.sdf_process_op.current_element_progress()
+
+ def try_split(self, fraction_of_remainder): # type: (...) -> Optional[Any]
+ result = self.sdf_process_op.try_split(fraction_of_remainder)
+ if result is not None:
+ return result
+ return None
+
+ def add_receiver(self, operation, output_index=0):
Review comment:
Why did we go down this path instead of replicating what
DataInputOperation was doing where it accessed receivers[0] and invoked
progress/split on it directly (which invokes progress/split on the ConsumerSet
which only gets forwarded if it is a SingletonConsumerSet)?
This transform always has a single output so receivers[0] should always be
populated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]