sundargates commented on a change in pull request #13988:
URL: https://github.com/apache/flink/pull/13988#discussion_r520199275



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
##########
@@ -41,12 +43,14 @@
        void start();
 
        /**
-        * Handles the source event from the source reader.
+        * Handles the request for a split. This method is called when the 
reader with the given subtask
+        * id calls the {@link SourceReaderContext#sendSplitRequest()} method.
         *
         * @param subtaskId the subtask id of the source reader who sent the 
source event.
-        * @param sourceEvent the source event from the source reader.
+        * @param requesterHostname Optional, the hostname where the requesting 
task is running.
+        *                          This can be used to make split assignments 
locality-aware.
         */
-       void handleSourceEvent(int subtaskId, SourceEvent sourceEvent);
+       void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname);

Review comment:
       +1

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
##########
@@ -87,4 +91,18 @@
         */
        @Override
        default void notifyCheckpointComplete(long checkpointId) throws 
Exception {}
+
+       /**
+        * Handles a custom source event from the source reader.
+        *
+        * <p>This method has a default implementation that does nothing, 
because it is only
+        * required to be implemented by some sources, which have a custom 
event protocol between
+        * reader and enumerator. The common events for reader registration and 
split requests
+        * are not dispatched to this method, but rather invoke the {@link 
#addReader(int)} and
+        * {@link #handleSplitRequest(int, String)} methods.
+        *
+        * @param subtaskId the subtask id of the source reader who sent the 
source event.
+        * @param sourceEvent the source event from the source reader.
+        */
+       default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) 
{}

Review comment:
       looks like the enumerators will silently ignore these events if this 
method was forgotten to be overridden? Wondering if a better default would be 
to either throw unhandled event exception or at the minimum log them so that 
the user is aware of the issue? The other thing to keep in mind is that the 
enumerator will only get custom events if the reader has decided to send them 
which implies that the user is interested in these events in the first place. 

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
##########
@@ -83,6 +83,14 @@ default void assignSplit(SplitT split, int subtask) {
                assignSplits(new SplitsAssignment<>(split, subtask));
        }
 
+       /**
+        * Signals a subtask that it will not receive any further split.
+        *
+        * @param subtask The index of the operator's parallel subtask that 
shall be
+        *                signaled it will not receive any further split.
+        */
+       void signalNoMoreSplits(int subtask);

Review comment:
       Would it also make sense to change this method signature to accept the 
list of subtasks to be notified? Also, can we have another method that can be 
used to notify all registered readers?
   
   ```
   void signalNoMoreSplits(Set<Integer> subtasksToBeNotified);
   
   default void signalNoMoreSplits() {
     signalNoMoreSplits(registeredReaders.keySet());
   }
   ```
   
   This is especially useful for bounded cases where you do not know ahead of 
time if there is any potential that a subtask could be assigned splits in the 
future based on the current state of the system as assigned splits could always 
be added back and could be reassigned to other subtasks. Only when the system 
has reached terminal state (i.e., no more pending splits, all splits 
completed), you can know for sure all subtasks can be marked as done. So having 
this method would help in not having to go through the coordinator thread for 
every single registered subtask. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java
##########
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.connector.source.event;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 /**
  * A source event sent from the SplitEnumerator to the SourceReader to 
indicate that no more
  * splits will be assigned to the source reader anymore. So once the 
SplitReader finishes
  * reading the currently assigned splits, they can exit.
  */
-public class NoMoreSplitsEvent implements SourceEvent {
+public class NoMoreSplitsEvent implements OperatorEvent {

Review comment:
       nit: should this be a final class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to