sundargates commented on a change in pull request #13988: URL: https://github.com/apache/flink/pull/13988#discussion_r520199275
########## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java ########## @@ -41,12 +43,14 @@ void start(); /** - * Handles the source event from the source reader. + * Handles the request for a split. This method is called when the reader with the given subtask + * id calls the {@link SourceReaderContext#sendSplitRequest()} method. * * @param subtaskId the subtask id of the source reader who sent the source event. - * @param sourceEvent the source event from the source reader. + * @param requesterHostname Optional, the hostname where the requesting task is running. + * This can be used to make split assignments locality-aware. */ - void handleSourceEvent(int subtaskId, SourceEvent sourceEvent); + void handleSplitRequest(int subtaskId, @Nullable String requesterHostname); Review comment: +1 ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java ########## @@ -87,4 +91,18 @@ */ @Override default void notifyCheckpointComplete(long checkpointId) throws Exception {} + + /** + * Handles a custom source event from the source reader. + * + * <p>This method has a default implementation that does nothing, because it is only + * required to be implemented by some sources, which have a custom event protocol between + * reader and enumerator. The common events for reader registration and split requests + * are not dispatched to this method, but rather invoke the {@link #addReader(int)} and + * {@link #handleSplitRequest(int, String)} methods. + * + * @param subtaskId the subtask id of the source reader who sent the source event. + * @param sourceEvent the source event from the source reader. + */ + default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {} Review comment: looks like the enumerators will silently ignore these events if this method was forgotten to be overridden? Wondering if a better default would be to either throw unhandled event exception or at the minimum log them so that the user is aware of the issue? The other thing to keep in mind is that the enumerator will only get custom events if the reader has decided to send them which implies that the user is interested in these events in the first place. ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java ########## @@ -83,6 +83,14 @@ default void assignSplit(SplitT split, int subtask) { assignSplits(new SplitsAssignment<>(split, subtask)); } + /** + * Signals a subtask that it will not receive any further split. + * + * @param subtask The index of the operator's parallel subtask that shall be + * signaled it will not receive any further split. + */ + void signalNoMoreSplits(int subtask); Review comment: Would it also make sense to change this method signature to accept the list of subtasks to be notified? Also, can we have another method that can be used to notify all registered readers? ``` void signalNoMoreSplits(Set<Integer> subtasksToBeNotified); default void signalNoMoreSplits() { signalNoMoreSplits(registeredReaders.keySet()); } ``` This is especially useful for bounded cases where you do not know ahead of time if there is any potential that a subtask could be assigned splits in the future based on the current state of the system as assigned splits could always be added back and could be reassigned to other subtasks. Only when the system has reached terminal state (i.e., no more pending splits, all splits completed), you can know for sure all subtasks can be marked as done. So having this method would help in not having to go through the coordinator thread for every single registered subtask. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java ########## @@ -16,16 +16,16 @@ * limitations under the License. */ -package org.apache.flink.api.connector.source.event; +package org.apache.flink.runtime.source.event; -import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; /** * A source event sent from the SplitEnumerator to the SourceReader to indicate that no more * splits will be assigned to the source reader anymore. So once the SplitReader finishes * reading the currently assigned splits, they can exit. */ -public class NoMoreSplitsEvent implements SourceEvent { +public class NoMoreSplitsEvent implements OperatorEvent { Review comment: nit: should this be a final class? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org