spencerhuang commented on issue #64205:
URL: https://github.com/apache/airflow/issues/64205#issuecomment-4135596134

   > > Stale AssetEvent rows in Airflow's Postgres Metastore were separate from 
Kafka offsets.
   > 
   > Can you expand on this a little bit more here? What do you mean by 
"separate"?
   > 
   > I'll preface this by saying that I am NOT a Kafka expert by any means. But 
for all `MessageQueueTriggers` (including Kafka), the logic is quite simple. 
When a message lands in the message queue an `AssetEvent` is created, 
triggering a downstream DAG (or DAGs). What's really being executed is this 
code 
(https://github.com/apache/airflow/blob/main/providers/apache/kafka/src/airflow/providers/apache/kafka/triggers/await_message.py#L106-L143),
 just checking for a simple event.
   
   @jroachgolf84 Thank you for the quick response. Let me provide some contexts 
on how I use Kafka in conjunction with Airflow.
   
   In Kafka, a Poison Pill is a specific message that consistently fails to be 
processed, often due to a corrupted message, causing the consumer to get 
"stuck" because it cannot move past the failing offset. To prevent this single 
message from stopping the entire data pipeline, we use a Dead Letter Queue 
(DLQ). A DLQ is a separate, dedicated Kafka topic where the consumer 
automatically reroutes these problematic messages after a few failed attempts. 
This allows the main pipeline to continue running smoothly while isolating the 
"poison" data for manual inspection and fixing later.
   
   To address this classic, common Kafka issue, here is the use-case:
   ```
   # Event-driven DAG triggered by the cdc_events Asset:
   with DAG(
       dag_id="cdc_processor",
       schedule=[cdc_asset],  # Triggered by AssetEvents
       ...
   ) as dag:
       result = process_and_trigger()   # @task (retries=3, exp backoff)
       handle_dlq_task = handle_dlq()   # @task (trigger_rule=ONE_FAILED)
       result >> handle_dlq_task
   ```
   process_and_trigger: Reads event payload from 
context["triggering_asset_events"], queries the data from a business DB, builds 
DAG run conf, starts the on-demand DAG via Airflow REST API, and maybe emits 
audit event.
   
   handle_dlq: Fires when process_and_trigger fails after 3 retries. Persists 
the failed message to the dead_letter_messages table and Kafka DLQ topic.
   
   **Now, the diverge, or separate/separation**
   
   A malformed Kafka message causes a performance degradation or crash in the 
Triggerer before a DLQ write can occur. To restore service, an on-call/support 
will naturally perform a Kafka offset reset (current + 1) to skip the bad 
record. However, because the AssetWatcher may have already partially emitted 
AssetEvents into Postgres before the stall/crash, the offset reset is 
insufficient. Airflow will still "phantom-process" those ghost events from its 
own database, even though the operator explicitly signaled the system to skip 
that data. We now have two Separate competing "Sources of Truth" (Kafka offsets 
vs. Airflow Asset state) that are fundamentally out of sync.
   
   This architectural separation would create problems during recovery also. If 
an operator resets Kafka to Latest to shed load during a lag spike or a 
slowdown in Airflow, they expect the system to stop processing old data. 
Instead, Airflow continues to churn through the backlog of pending AssetEvents 
stored in the metadata DB. Without a unified state, we lose the ability to 
perform a Point-in-Time Recovery. Resetting one does not reset the other, 
making a clean "replay" of Kafka events difficult to coordinate. By decoupling 
the ingestion state from the orchestration state, we have traded operational 
simplicity for a system where the left hand (Kafka) no longer knows what the 
right hand (Airflow) is doing. I hope this clarifies why the current separation 
is a significant concern for production-grade, event-driven pipelines.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to