saavannanavati commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r471169649
##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +268,89 @@ def visit_transform(self, applied_transform):
transform.get_type_hints(),
applied_transform.full_label),
applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+ _in_combine = False
+ combine_classes = (
+ core.CombineFn,
+ core.CombinePerKey,
+ core.CombineValuesDoFn,
+ core.CombineValues,
+ core.CombineGlobally)
+
+ def enter_composite_transform(self, applied_transform):
+ if isinstance(applied_transform.transform, self.combine_classes):
+ self._in_combine = True
+
+ def leave_composite_transform(self, applied_transform):
+ if isinstance(applied_transform.transform, self.combine_classes):
+ self._in_combine = False
+
+ def visit_transform(self, applied_transform):
+ transform = applied_transform.transform
+ if isinstance(transform, core.ParDo) and not self._in_combine:
+ # Prefix label with 'ParDo' if necessary
+ full_label = applied_transform.full_label
+ if not full_label.startswith('ParDo'):
+ full_label = 'ParDo(%s)' % full_label
+
+ # Store output type hints in current transform
+ transform.fn._runtime_output_constraints = {}
+ output_type_hints = self.get_output_type_hints(transform)
+ if output_type_hints:
+ transform.fn._runtime_output_constraints[full_label] = (
+ output_type_hints)
+
+ # Store input type hints in producer transform
Review comment:
Nevermind, fixed it :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]