suryaprasanna opened a new issue, #18298:
URL: https://github.com/apache/hudi/issues/18298
### Feature Description
**What the feature achieves:**
This feature would enable automatic collection and tracking of data
freshness metrics for Hudi tables by exposing lineage information from write
operations.
Key capabilities:
- Automatic freshness metrics collection based on upstream sources (Kafka
topics, other datasets, etc.)
- Date-wise freshness metrics based on partitions read or written. This is
only applicable for date partitions (initial phase)
- Commit-level freshness metrics. Using this true freshness of the dataset
can be calculated (future phase)
- Lineage information returned from write operations that can be consumed by
external freshness tracking services
- Unified freshness tracking across both Hudi and traditional Hive datasets
in the datalake.
**Why this feature is needed:**
In general, Datalake contains multiple data formats like Hudi, Iceberg and
traditional Hive datasets, and we need a way to track data freshness across all
of them. Currently:
- No built-in mechanism: There is no way to collect freshness values for
Hudi datasets in the datalake.
- Missing lineage information: Write operations (df.write API, INSERT
OVERWRITE, INSERT INTO, MERGE INTO) return Seq.empty[Row] without providing any
lineage or freshness metadata to engines like Spark.
- Manual tracking is not scalable: Without automatic collection, tracking
freshness across multiple datasets is error-prone and doesn't scale
- No integration points: External freshness services cannot automatically
receive updates from Hudi write operations
### User Experience
**How users will use this feature:**
- Configuration changes needed
- API changes
- Current behavior:
```// InsertIntoHoodieTableCommand and similar operations return empty
results
val result: Seq[Row] = Seq.empty[Row]
```
- Proposed behavior:
```
// Return lineage and freshness information
val result: Seq[Row] = Seq(
Row(
"table_name" -> "my_hudi_table",
"commit_time" -> "20260309120000",
"upstream_sources" -> Array("kafka_topic_xyz", "upstream_table_abc"),
"freshness_timestamp" -> "2026-03-09T12:00:00Z",
"record_count" -> 1000000
)
)
```
- Usage examples
Example 1: Spark DataFrame Write API
val df = spark.read.format("kafka")...
// Write to Hudi table
val result = df.write
.format("hudi")
.option("hoodie.freshness.tracking.enabled", "true")
.save("path/to/hudi/table")
// Consume freshness information
result.foreach { row =>
val freshnessInfo = row.getAs[Map[String, Any]]("lineage_info")
freshnessService.update(freshnessInfo)
}
Example 2: Spark SQL
INSERT INTO hudi_table
SELECT * FROM kafka_source;
-- Freshness info automatically sent to configured service
-- or available via Spark's QueryExecution
### Hudi RFC Requirements
**RFC PR link:** (if applicable)
**Why RFC is/isn't needed:**
- Does this change public interfaces/APIs? (Yes/No)
- Does this change storage format? (Yes/No)
- Justification:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]