pvary commented on PR #11497:
URL: https://github.com/apache/iceberg/pull/11497#issuecomment-2519720233
To summarise the state, I see the following open questions:
- State handling and restoration
- How to handle table schema, specification, configuration changes
- How to handle errors
**State handling and restoration**
What we know:
- There will be no issues with aligned checkpoints, and the checkpoint
barriers are aligned with the triggers, so every event is processed in a single
checkpoint
- With unaligned checkpoints
- The state of the compaction job is not super important as the user
could always trigger a new compaction and drop old pending results
- There are good barriers against double committing the same compaction
results. If a replaced file doesn’t exists in the table anymore then the commit
will fail
- If we want to use the core CommitManager strategy (which handles
partial commits), then we can’t guarantee the consistency of the state and the
Iceberg table, since the CommitManager uses its own thread to do the actual
commits - here the Flink folks might come up with some solution currently
unknown to me
- If we restore an old checkpoint or savepoint we are almost guaranteed
to fail to commit the inProgress records as they are most probably already
committed by the previous iteration of the job
Currently the following ideas emerged to handle the situation:
- Use core CommitManager, store the state and do a best effort when
committing the inProgress records
- Helps if the job stopped ungracefully (without calling the close on
the DataFileRewriteCommitter
- Generates errors on start (gracefully handled)
- Generates errors on receiving groups stored in the wire state
(gracefully handled)
- Create our own strategy for committing
- Needs an elaborate strategy to store the group ids in the commits
- Before adding a new group we need to scan the table if the group is
already committed - either with cache, or continuous checks
- We lose the features provided by the core CommitManager (groups
handling and parallel commits)
- Remove state from the Compaction Task
- Simplest implementation
- We might lose some in flight work - nothing on graceful stop, but
there might be some loss on crashing jobs
**How to handle table schema, specification, configuration changes**
We could decide if we want to handle table
schema/specification/configuration changes.
- We could opt for foregoing those changes and use the values available when
the job started (Like every current Flink job)
- We need to add guardrails to prevent compaction when there are some
changes in the table which could cause issues. Like losing date from new
columns when reading new files with the old schema
- We could opt for keeping the schema/specification/configuration
up-to-date. For this we have the following options:
- Generate a SerializableTable object at planning time, and send it
through the wire for the DataFileRewriteExecutors
- The size of the table could increase the network traffic
- There is some issue with S3FileIO serialization which needs to be
fixed
- Send only the snapshotId to the DataFileRewriteExecutors and load the
table on every executor
- Adds extra load to the Catalog, as every executor needs to fetch
the table data once for every trigger
**How to handle errors**
We have 2 options for propagating the errors in the specific operators
- Use side output to emitt the error and aggregate it in an final operator
- Better separation of concerns
- Send a Pair<Data, Exception> as an output and handle the exceptions in
every operator
- Simpler flow
Another ortogonal question wrt. the exceptions is what to propagate:
- Propagate the whole exception
- We have the full stack trace available at the aggregation point
- Might cause state issues if the Exception is changed
- Propagate the exception message only and log the exception locally
- We have less network traffic
My current preferences:
- **State handling and restoration**: 3
- **How to handle table schema, specification, configuration changes**: 2/a
- **How to handle errors**: 1, 1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]