TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864394015
##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
return self.consumer.current_element_progress()
+class GeneralPurposeConsumerSet(ConsumerSet):
+ """ConsumerSet implementation that handles all combinations of possible
edges.
+ """
+ def __init__(self,
+ counter_factory,
+ step_name, # type: str
+ output_index,
+ coder,
+ producer_type_hints,
+ consumers, # type: List[Operation]
+ producer_batch_converter):
+ super().__init__(
+ counter_factory,
+ step_name,
+ output_index,
+ consumers,
+ coder,
+ producer_type_hints)
+
+ self.producer_batch_converter = producer_batch_converter
+
+ # Partition consumers into three groups:
+ # - consumers that will be passed elements
+ # - consumers that will be passed batches (where their input batch type
+ # matches the output of the producer)
+ # - consumers that will be passed converted batches
+ self.element_consumers: List[Operation] = []
+ self.passthrough_batch_consumers: List[Operation] = []
+ other_batch_consumers: DefaultDict[
+ BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+ for consumer in consumers:
+ if not consumer.get_batching_preference().supports_batches:
+ self.element_consumers.append(consumer)
+ elif (consumer.get_input_batch_converter() ==
+ self.producer_batch_converter):
+ self.passthrough_batch_consumers.append(consumer)
+ else:
+ # Batch consumer with a mismatched batch type
+ if consumer.get_batching_preference().supports_elements:
+ # Pass it elements if we can
+ self.element_consumers.append(consumer)
+ else:
+ # As a last resort, explode and rebatch
+ consumer_batch_converter = consumer.get_input_batch_converter()
+ # This consumer supports batches, it must have a batch converter
+ assert consumer_batch_converter is not None
+ other_batch_consumers[consumer_batch_converter].append(consumer)
+
+ self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+ other_batch_consumers)
+
+ self.has_batch_consumers = (
+ self.passthrough_batch_consumers or self.other_batch_consumers)
+ self._batched_elements: List[Any] = []
+
+ def receive(self, windowed_value):
+ # type: (WindowedValue) -> None
+ self.update_counters_start(windowed_value)
+
+ for consumer in self.element_consumers:
+ cython.cast(Operation, consumer).process(windowed_value)
+
+ # TODO: Do this branching when contstructing ConsumerSet
+ if self.has_batch_consumers:
+ self._batched_elements.append(windowed_value)
+
+ self.update_counters_finish()
Review Comment:
Discussed this some offline. For now, this implementation will be as good as
what I'm doing in receive_batch (element counts will be correct, byte size
estimates can be off). Improving it will be tracked as part of the same TODO
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]