JosephCatrambone-Gridware opened a new issue, #56494:
URL: https://github.com/apache/airflow/issues/56494
### Description
As of Airflow 3.0.6, the ConsumeFromTopicOperator takes the name+path to a
function and applies it to all messages in a batch, but the results of this
function call are discarded and the method returns None. Rather than dropping
the result of the `apply_function`, the results of the apply_function could be
accumulated and returned so that other tasks could use the results.
### Use case/motivation
It would be nice to be able to pipeline operators and functions like we can
with other Python tasks. An example:
```python
def my_service_prefilter(message):
# This will get messages from my-topic-1 and my-topic-2. We will
extract and return some dicts.
if message and message.value():
decoded = json.loads(message.value().decode("utf-8")
if decoded['some_value']:
return decoded
return None
@task.python
def handle_batch(decoded_items: list[dict]):
for item in decoded_items:
if item:
do_something_with_item(item)
message_batch = ConsumeFromTopicOperator(
task_id="consume_pending_kafka_messages",
topics=["my-topic-1", "my-topic-2", ],
apply_function="my_service.prefilter_function",
)
message_batch >> handle_batch
```
Written differently:
Kafka message stream -> ConsumeFromTopicOperator -> [message returned by the
apply_function * batch size] -> whatever downstream task.
I initially wrote this as a bug report after spending a while with a
colleague who had treated the ConsumeFromTopicOperator like other operators,
but realizing that returning None is the intended behavior.
I recognize that the Airflow documentation explicitly states that Airflow is
_not_ made to be used in the streaming data sense, but in this case if we need
to transform some data and distribute it to other tasks we need to build our
own operators instead of just reusing the existing Operators. The Operator
documentation seems to suggest that Operators be composable.
### Related issues
_No response_
### Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]