Guosmilesmile commented on code in PR #15042:
URL: https://github.com/apache/iceberg/pull/15042#discussion_r2717055143
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java:
##########
@@ -226,21 +258,43 @@ public void append() throws IOException {
try (TableLoader loader = tableLoader.clone()) {
loader.open();
String tableName = loader.loadTable().name();
- DataStream<Trigger> triggers =
- DataStreamUtils.reinterpretAsKeyedStream(
- changeStream(tableName, loader), unused -> true)
- .process(
- new TriggerManager(
- loader,
- lockFactory,
- taskNames,
- evaluators,
- rateLimit.toMillis(),
- lockCheckDelay.toMillis()))
- .name(TRIGGER_MANAGER_OPERATOR_NAME)
- .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
- .slotSharingGroup(slotSharingGroup)
- .forceNonParallel()
+ DataStream<Trigger> triggers;
+ if (lockFactory == null) {
+ triggers =
+ DataStreamUtils.reinterpretAsKeyedStream(
+ changeStream(tableName, loader), unused -> true)
+ .transform(
+ TRIGGER_MANAGER_OPERATOR_NAME,
+ TypeInformation.of(Trigger.class),
+ new TriggerManagerOperatorFactory(
+ tableName,
+ taskNames,
+ evaluators,
+ rateLimit.toMillis(),
+ lockCheckDelay.toMillis()))
+ .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+ .slotSharingGroup(slotSharingGroup)
+ .forceNonParallel();
+ } else {
+ triggers =
+ DataStreamUtils.reinterpretAsKeyedStream(
+ changeStream(tableName, loader), unused -> true)
+ .process(
+ new TriggerManager(
+ loader,
+ lockFactory,
+ taskNames,
+ evaluators,
+ rateLimit.toMillis(),
+ lockCheckDelay.toMillis()))
+ .name(TRIGGER_MANAGER_OPERATOR_NAME)
+ .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+ .slotSharingGroup(slotSharingGroup)
+ .forceNonParallel();
Review Comment:
From my perspective, I have thought about this question, but I haven’t found
a good way to solve it yet. If you have any good suggestions, please feel free
to let me know.
1. TriggerManager and LockRemover are both related to ProcessFunction, while
TriggerManagerOperator and LockRemoverOperator both need to be wired in via
`transform`. Their APIs are different at the Flink layer, so I didn’t combine
them.
2. In the long run, if coordinateLock proves to be feasible, we might be
able to remove these external locks later on. Keeping them separate now will
make it easier to decouple them in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]