mxm commented on code in PR #15042:
URL: https://github.com/apache/iceberg/pull/15042#discussion_r2720595141
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java:
##########
@@ -226,21 +258,43 @@ public void append() throws IOException {
try (TableLoader loader = tableLoader.clone()) {
loader.open();
String tableName = loader.loadTable().name();
- DataStream<Trigger> triggers =
- DataStreamUtils.reinterpretAsKeyedStream(
- changeStream(tableName, loader), unused -> true)
- .process(
- new TriggerManager(
- loader,
- lockFactory,
- taskNames,
- evaluators,
- rateLimit.toMillis(),
- lockCheckDelay.toMillis()))
- .name(TRIGGER_MANAGER_OPERATOR_NAME)
- .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
- .slotSharingGroup(slotSharingGroup)
- .forceNonParallel()
+ DataStream<Trigger> triggers;
+ if (lockFactory == null) {
+ triggers =
+ DataStreamUtils.reinterpretAsKeyedStream(
+ changeStream(tableName, loader), unused -> true)
+ .transform(
+ TRIGGER_MANAGER_OPERATOR_NAME,
+ TypeInformation.of(Trigger.class),
+ new TriggerManagerOperatorFactory(
+ tableName,
+ taskNames,
+ evaluators,
+ rateLimit.toMillis(),
+ lockCheckDelay.toMillis()))
+ .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+ .slotSharingGroup(slotSharingGroup)
+ .forceNonParallel();
+ } else {
+ triggers =
+ DataStreamUtils.reinterpretAsKeyedStream(
+ changeStream(tableName, loader), unused -> true)
+ .process(
+ new TriggerManager(
+ loader,
+ lockFactory,
+ taskNames,
+ evaluators,
+ rateLimit.toMillis(),
+ lockCheckDelay.toMillis()))
+ .name(TRIGGER_MANAGER_OPERATOR_NAME)
+ .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+ .slotSharingGroup(slotSharingGroup)
+ .forceNonParallel();
Review Comment:
>1. TriggerManager and LockRemover are both related to ProcessFunction,
while TriggerManagerOperator and LockRemoverOperator both need to be wired in
via transform. Their APIs are different at the Flink layer, so I didn’t combine
them.
I don't think that is a blocker. `KeyedStream.process(..)` internally uses
`transform` as well.
>2. In the long run, if coordinateLock proves to be feasible, we might be
able to remove these external locks later on. Keeping them separate now will
make it easier to decouple them in the future.
I tend to agree. The effort to consolidate the two implementations is rather
high and will make removing the old implementation harder.
>I would love to remove the old code (LockManager altogether) after a
release.
So in this sense the duplication is not bad, because it would allow us to
remove the old code easily.
Makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]