saavannanavati commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r468328472
##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +269,95 @@ def visit_transform(self, applied_transform):
transform.get_type_hints(),
applied_transform.full_label),
applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+ _in_combine = False
+ combine_classes = (
+ core.CombineFn,
+ core.CombinePerKey,
+ core.CombineValuesDoFn,
+ core.CombineValues,
+ core.CombineGlobally)
+
+ def enter_composite_transform(self, applied_transform):
+ if isinstance(applied_transform.transform, self.combine_classes):
+ self._in_combine = True
+
+ def leave_composite_transform(self, applied_transform):
+ if isinstance(applied_transform.transform, self.combine_classes):
+ self._in_combine = False
+
+ def visit_transform(self, applied_transform):
+ transform = applied_transform.transform
+ if isinstance(transform, core.ParDo) and not self._in_combine:
+ # Store output type hints in current transform
+ transform.fn._runtime_output_constraints = {
+ applied_transform.full_label: self.get_output_type_hints(transform)
+ }
+
+ # Store input type hints in producer transform
+ producer = applied_transform.inputs[0].producer
+ if (hasattr(producer, 'transform') and
Review comment:
An `Impulse` has no `transform` attribute
I simplified the code to check if `producer` has a `transform` attribute and
then check if `producer.transform` is an instance of `ParDo`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]