pnowojski commented on a change in pull request #10151: [FLINK-14231] Handle the pending processing-time timers to make endInput semantics on the operator chain strict URL: https://github.com/apache/flink/pull/10151#discussion_r368994586
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -1046,10 +1046,16 @@ TimerService getTimerService() { return timerService; } - public ProcessingTimeService getProcessingTimeService(int operatorIndex) { - Preconditions.checkState(timerService != null, "The timer service has not been initialized."); - MailboxExecutor mailboxExecutor = mailboxProcessor.getMailboxExecutor(operatorIndex); - return new ProcessingTimeServiceImpl(timerService, callback -> deferCallbackToMailbox(mailboxExecutor, callback)); Review comment: Why was this removed? Yes, the method is ugly and the solution would be to properly use `StreamOperatorFactory` concept, but the deletion of this method is having a very nasty effect, where `ProcessingTimeServiceAware` can be applied to a factory AND to an operator, which creates a confusion by allowing for a bit pointless patterns, where operator has a factory and both the operator and it's factory are `ProcessingTimeServiceAware`. Unless I'm missing something, I think it would be better for the API to keep this method, mark it `@Deprecated`, and never allow for `ProcessingTimeServiceAware` to be applied on an operator. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services