spencerhuang commented on issue #64205: URL: https://github.com/apache/airflow/issues/64205#issuecomment-4135596134
> > Stale AssetEvent rows in Airflow's Postgres Metastore were separate from Kafka offsets. > > Can you expand on this a little bit more here? What do you mean by "separate"? > > I'll preface this by saying that I am NOT a Kafka expert by any means. But for all `MessageQueueTriggers` (including Kafka), the logic is quite simple. When a message lands in the message queue an `AssetEvent` is created, triggering a downstream DAG (or DAGs). What's really being executed is this code (https://github.com/apache/airflow/blob/main/providers/apache/kafka/src/airflow/providers/apache/kafka/triggers/await_message.py#L106-L143), just checking for a simple event. @jroachgolf84 Thank you for the quick response. Let me provide some contexts on how I use Kafka in conjunction with Airflow. In Kafka, a Poison Pill is a specific message that consistently fails to be processed, often due to a corrupted message, causing the consumer to get "stuck" because it cannot move past the failing offset. To prevent this single message from stopping the entire data pipeline, we use a Dead Letter Queue (DLQ). A DLQ is a separate, dedicated Kafka topic where the consumer automatically reroutes these problematic messages after a few failed attempts. This allows the main pipeline to continue running smoothly while isolating the "poison" data for manual inspection and fixing later. To address this classic, common Kafka issue, here is the use-case: ``` # Event-driven DAG triggered by the cdc_events Asset: with DAG( dag_id="cdc_processor", schedule=[cdc_asset], # Triggered by AssetEvents ... ) as dag: result = process_and_trigger() # @task (retries=3, exp backoff) handle_dlq_task = handle_dlq() # @task (trigger_rule=ONE_FAILED) result >> handle_dlq_task ``` process_and_trigger: Reads event payload from context["triggering_asset_events"], queries the data from a business DB, builds DAG run conf, starts the on-demand DAG via Airflow REST API, and maybe emits audit event. handle_dlq: Fires when process_and_trigger fails after 3 retries. Persists the failed message to the dead_letter_messages table and Kafka DLQ topic. **Now, the diverge, or separate/separation** A malformed Kafka message causes a performance degradation or crash in the Triggerer before a DLQ write can occur. To restore service, an on-call/support will naturally perform a Kafka offset reset (current + 1) to skip the bad record. However, because the AssetWatcher may have already partially emitted AssetEvents into Postgres before the stall/crash, the offset reset is insufficient. Airflow will still "phantom-process" those ghost events from its own database, even though the operator explicitly signaled the system to skip that data. We now have two Separate competing "Sources of Truth" (Kafka offsets vs. Airflow Asset state) that are fundamentally out of sync. This architectural separation would create problems during recovery also. If an operator resets Kafka to Latest to shed load during a lag spike or a slowdown in Airflow, they expect the system to stop processing old data. Instead, Airflow continues to churn through the backlog of pending AssetEvents stored in the metadata DB. Without a unified state, we lose the ability to perform a Point-in-Time Recovery. Resetting one does not reset the other, making a clean "replay" of Kafka events difficult to coordinate. By decoupling the ingestion state from the orchestration state, we have traded operational simplicity for a system where the left hand (Kafka) no longer knows what the right hand (Airflow) is doing. I hope this clarifies why the current separation is a significant concern for production-grade, event-driven pipelines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
