JosephCatrambone-Gridware opened a new issue, #56494:
URL: https://github.com/apache/airflow/issues/56494

   ### Description
   
   As of Airflow 3.0.6, the ConsumeFromTopicOperator takes the name+path to a 
function and applies it to all messages in a batch, but the results of this 
function call are discarded and the method returns None.  Rather than dropping 
the result of the `apply_function`, the results of the apply_function could be 
accumulated and returned so that other tasks could use the results.
   
   ### Use case/motivation
   
   It would be nice to be able to pipeline operators and functions like we can 
with other Python tasks.  An example:
   
   ```python
   def my_service_prefilter(message):
       # This will get messages from my-topic-1 and my-topic-2.  We will 
extract and return some dicts.
       if message and message.value():
           decoded = json.loads(message.value().decode("utf-8")
           if decoded['some_value']:
               return decoded
       return None
   
   @task.python
   def handle_batch(decoded_items: list[dict]):
       for item in decoded_items:
           if item:
               do_something_with_item(item)
   
   message_batch = ConsumeFromTopicOperator(
       task_id="consume_pending_kafka_messages",
       topics=["my-topic-1", "my-topic-2", ],
       apply_function="my_service.prefilter_function",
   )
   
   message_batch >> handle_batch
   ```
   
   Written differently:
   
   Kafka message stream -> ConsumeFromTopicOperator -> [message returned by the 
apply_function * batch size] -> whatever downstream task.
   
   I initially wrote this as a bug report after spending a while with a 
colleague who had treated the ConsumeFromTopicOperator like other operators, 
but realizing that returning None is the intended behavior.  
   
   I recognize that the Airflow documentation explicitly states that Airflow is 
_not_ made to be used in the streaming data sense, but in this case if we need 
to transform some data and distribute it to other tasks we need to build our 
own operators instead of just reusing the existing Operators.  The Operator 
documentation seems to suggest that Operators be composable.
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to