codope commented on issue #18298:
URL: https://github.com/apache/hudi/issues/18298#issuecomment-4459502449
@yihua @suryaprasanna Following up on our offline discussion.
PR #18726 adds `innerChildren` to hudi-spark sql command and an analyzer
rule that adds catalogTable onto path-based incremental/CDC reads. That PR is
intentionally minimal. It makes Hudi's analyzed Spark plans introspectable so
the OpenLineage Spark (OL-Spark) integration (and any other Catalyst plan
walker) can extract lineage without compile-time coupling to Hudi internals.
For Spark SQL, that should be enough. OL-Spark already hooks into
QueryExecutionListener and walks `QueryExecution.executedPlan`. Once Hudi
commands expose `innerChildren`, OL-Spark's existing `MergeIntoTable /
UpdateTable / DeleteFromTable` visitors (used today for Delta/Iceberg) will
start producing lineage edges for Hudi too. Same applies to Spark DataFrame
writes (already covered by OL-Spark's `SaveIntoDataSourceCommand` handling) and
Structured Streaming.
I want to make a separate case for going beyond this fix and close gaps that
engine-side listeners structurally cannot, such as:
1. Hudi-internal operations like compaction, clustering, rollback are first
class events on Hudi timeline that no engine query produces. I think compaction
and clustering don't affect lineage, but rollback is a real lineage event ("the
data committed at instant T1 has been removed") that downstream consumers may
need to know about.
2. Multi-engine/engine agnostic lineage support. So the patch only makes
Spark plans introspectable. We can go ahead and do something similar for Flink.
Each engine's lineage agent (OL-Spark, OL-Flink, etc.) constructs its own
dataset identifier from what it can see locally (catalog name, base path,
options). The result is that lineage for one Hudi table looks different
depending on which engine wrote it. But, wouldn't it be nice to come up with
engine-agnostic lineage event payload that is fired on commit callback, and
lineage reporter interface that can be implemented for any lineage extractor
backend. A Hudi-side hook is the only place where one canonical lineage event
payload can be asserted. The idea is similar to `MetricsReporter` pattern that
we already have in Hudi.
3. Also, engine-side listeners basically see the attempt but not the result.
What I mean is extracting lineage soely from plan is best-effort. The execution
of the plan may have failed.. Hudi also amends the plan for some commands (like
MergInto) that could affect lineage. Instead of that, emitting a lineage event
in commit callback is closer to the truth and we can more stuff such as
post-dedup filegroups affected, partitions touched, etc in addition to
table/column lineage from the plan. Some of this exists in commit metadata but
is invisible to the engine listener.
### Very, very high-level proposal
As I said, the pattern that fits Hudi's existing conventions is a
`LineageReporter` interface modeled directly on `MetricsReporter`: a neutral
interface in core. Backend implementations could be in optional new modules,
e.g. hudi-openlineage (any OL dependency is scoped to this module only and does
not affect hudi core).
```
public interface LineageReporter {
void onCommit(LineageEvent event);
default void close() {}
}
```
In addition to the reporter interface, we need a `LineageEvent` payload
(commit instant, operation type, output dataset, optional inputs, optional
column mappings, extra metadata). It deliberately does not reference any
specific lineage spec. Reporters translate it to whatever their backend speaks
e.g. OpenLineage RunEvent, Atlas EntityNotification, custom internal formats,
whatever. We also need a `HoodiePlanLineageExtractor` interface and per-engine
implementation can live in engine-specific module such as hudi-spark-common for
Spark. This will basically parse the plan and build the lineage context.
Finally, the hook point is `BaseHoodieWriteClient.commit(...)`, post-commit and
best-effort, identical in spirit to how `HoodieWriteCommitCallback` already
fires. This is where we can enrich the lineage context with more info such as
commit time, partitons, filegroups, etc.
I know this is still very hand-wavy. Happy to expand any of these into a
proper RFC once there's directional agreement. But, I guess the open questions
are:
1. Is it that valuable to add something like this in Hudi? Or even is it
Hudi's concern or not? I think Hudi is more than a table format, so in my
opinion there's definitely value for teams that care about data governance and
all.
2. If we add this new feature, who maintains the optional backend
implementation modules? We still need to take care of those modules in
releases. I think we can provide one out-of-box implementation and take its
ownership (OpenLineage is more widely adopted) and invite the community to add
more (just like the MetricsReporter).
However, please let me know what you think and if there are more concerns
around the proposal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]