This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e281f75386fad5fb2c0c3b10e246b37d02e432d
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Nov 9 00:27:34 2020 +0100

    [FLINK-20049][core] Add built-in method to request split in source API.
    
    This replaces the custom event handling done by many source implementations.
---
 .../source/reader/mocks/MockSplitEnumerator.java   | 11 +++---
 .../src/impl/ContinuousFileSplitEnumerator.java    | 17 ++++-----
 .../connector/file/src/impl/FileSourceReader.java  |  9 ++---
 .../file/src/impl/StaticFileSplitEnumerator.java   | 42 +++++++++-------------
 .../file/src/FileSourceHeavyThroughputTest.java    |  3 ++
 .../hive/ContinuousHiveSplitEnumerator.java        | 16 +++++----
 .../source/enumerator/KafkaSourceEnumerator.java   |  7 ++--
 .../kafka/source/reader/KafkaSourceReaderTest.java |  5 +--
 .../api/connector/source/SourceReaderContext.java  |  7 ++++
 .../api/connector/source/SplitEnumerator.java      | 24 +++++++++++--
 .../source/lib/util/IteratorSourceEnumerator.java  | 11 ++----
 .../source/lib/util/IteratorSourceReader.java      |  3 +-
 .../source/mocks/MockSplitEnumerator.java          |  5 +++
 .../source/mocks/TestingReaderContext.java         |  3 ++
 .../source/coordinator/SourceCoordinator.java      |  8 ++++-
 .../runtime}/source/event/RequestSplitEvent.java   |  6 ++--
 .../streaming/api/operators/SourceOperator.java    |  6 ++++
 .../checkpointing/UnalignedCheckpointITCase.java   |  5 ++-
 18 files changed, 111 insertions(+), 77 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
index 77da1ba..134a21f 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -45,14 +47,13 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Lis
        }
 
        @Override
-       public void start() {
-
-       }
+       public void start() {}
 
        @Override
-       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}
 
-       }
+       @Override
+       public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {}
 
        @Override
        public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
index 59ccece..74c2b99 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
 import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
@@ -32,6 +31,8 @@ import org.apache.flink.core.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashSet;
@@ -106,14 +107,14 @@ public class ContinuousFileSplitEnumerator implements 
SplitEnumerator<FileSource
        }
 
        @Override
+       public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+               readersAwaitingSplit.put(subtaskId, requesterHostname);
+               assignSplits();
+       }
+
+       @Override
        public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
-               if (sourceEvent instanceof RequestSplitEvent) {
-                       readersAwaitingSplit.put(subtaskId, 
((RequestSplitEvent) sourceEvent).hostName());
-                       assignSplits();
-               }
-               else {
-                       LOG.error("Received unrecognized event: {}", 
sourceEvent);
-               }
+               LOG.error("Received unrecognized event: {}", sourceEvent);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
index 8654211..7b28566 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/FileSourceReader.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.file.src.impl;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import org.apache.flink.connector.file.src.FileSourceSplit;
@@ -48,12 +47,12 @@ public final class FileSourceReader<T, SplitT extends 
FileSourceSplit>
 
        @Override
        public void start() {
-               requestSplit();
+               context.sendSplitRequest();
        }
 
        @Override
        protected void onSplitFinished(Collection<String> finishedSplitIds) {
-               requestSplit();
+               context.sendSplitRequest();
        }
 
        @Override
@@ -66,10 +65,6 @@ public final class FileSourceReader<T, SplitT extends 
FileSourceSplit>
                return splitState.toFileSourceSplit();
        }
 
-       private void requestSplit() {
-               context.sendSourceEventToCoordinator(new 
RequestSplitEvent(context.getLocalHostName()));
-       }
-
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {}
 }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
index de10682..7dcf703 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
@@ -84,30 +83,7 @@ public class StaticFileSplitEnumerator implements 
SplitEnumerator<FileSourceSpli
        }
 
        @Override
-       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
-               if (sourceEvent instanceof RequestSplitEvent) {
-                       final RequestSplitEvent requestSplitEvent = 
(RequestSplitEvent) sourceEvent;
-                       assignNextEvents(subtaskId, 
requestSplitEvent.hostName());
-               }
-               else {
-                       LOG.error("Received unrecognized event: {}", 
sourceEvent);
-               }
-       }
-
-       @Override
-       public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
-               LOG.debug("File Source Enumerator adds splits back: {}", 
splits);
-               splitAssigner.addSplits(splits);
-       }
-
-       @Override
-       public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() {
-               return 
PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private void assignNextEvents(int subtask, @Nullable String hostname) {
+       public void handleSplitRequest(int subtask, @Nullable String hostname) {
                if (LOG.isInfoEnabled()) {
                        final String hostInfo = hostname == null ? "(no host 
locality info)" : "(on host '" + hostname + "')";
                        LOG.info("Subtask {} {} is requesting a file source 
split", subtask, hostInfo);
@@ -124,4 +100,20 @@ public class StaticFileSplitEnumerator implements 
SplitEnumerator<FileSourceSpli
                        LOG.info("No more splits available for subtask {}", 
subtask);
                }
        }
+
+       @Override
+       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+               LOG.error("Received unrecognized event: {}", sourceEvent);
+       }
+
+       @Override
+       public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+               LOG.debug("File Source Enumerator adds splits back: {}", 
splits);
+               splitAssigner.addSplits(splits);
+       }
+
+       @Override
+       public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() {
+               return 
PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
+       }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
index a789605..9a43643 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -206,6 +206,9 @@ public class FileSourceHeavyThroughputTest {
                }
 
                @Override
+               public void sendSplitRequest() {}
+
+               @Override
                public void sendSourceEventToCoordinator(SourceEvent 
sourceEvent) {}
        }
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
index 628203f..31aa490 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
@@ -21,7 +21,6 @@ package org.apache.flink.connectors.hive;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.file.src.FileSourceSplit;
 import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
@@ -35,6 +34,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -111,13 +112,14 @@ public class ContinuousHiveSplitEnumerator<T extends 
Comparable<T>> implements S
        }
 
        @Override
+       public void handleSplitRequest(int subtaskId, @Nullable String 
hostName) {
+               readersAwaitingSplit.put(subtaskId, hostName);
+               assignSplits();
+       }
+
+       @Override
        public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
-               if (sourceEvent instanceof RequestSplitEvent) {
-                       readersAwaitingSplit.put(subtaskId, 
((RequestSplitEvent) sourceEvent).hostName());
-                       assignSplits();
-               } else {
-                       LOG.error("Received unrecognized event: {}", 
sourceEvent);
-               }
+               LOG.error("Received unrecognized event: {}", sourceEvent);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index b3c6888..e78df5c 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.kafka.source.enumerator;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
@@ -40,6 +39,8 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -136,8 +137,8 @@ public class KafkaSourceEnumerator implements 
SplitEnumerator<KafkaPartitionSpli
        }
 
        @Override
-       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
-
+       public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+               // the kafka source pushes splits eagerly, rather than act upon 
split requests
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index a802d63..7e7f646 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -172,9 +172,10 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
                        }
 
                        @Override
-                       public void sendSourceEventToCoordinator(SourceEvent 
sourceEvent) {
+                       public void sendSplitRequest() {}
 
-                       }
+                       @Override
+                       public void sendSourceEventToCoordinator(SourceEvent 
sourceEvent) {}
                });
        }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 4d9e53a..f1ea255 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -50,6 +50,13 @@ public interface SourceReaderContext {
        int getIndexOfSubtask();
 
        /**
+        * Sends a split request to the source's {@link SplitEnumerator}.
+        * This will result in a call to the {@link 
SplitEnumerator#handleSplitRequest(int, String)} method,
+        * with this reader's parallel subtask id and the hostname where this 
reader runs.
+        */
+       void sendSplitRequest();
+
+       /**
         * Send a source event to the source coordinator.
         *
         * @param sourceEvent the source event to coordinator.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index d049c47..e19bb4b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.connector.source;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.state.CheckpointListener;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -41,12 +43,14 @@ public interface SplitEnumerator<SplitT extends 
SourceSplit, CheckpointT>
        void start();
 
        /**
-        * Handles the source event from the source reader.
+        * Handles the request for a split. This method is called when the 
reader with the given subtask
+        * id calls the {@link SourceReaderContext#sendSplitRequest()} method.
         *
         * @param subtaskId the subtask id of the source reader who sent the 
source event.
-        * @param sourceEvent the source event from the source reader.
+        * @param requesterHostname Optional, the hostname where the requesting 
task is running.
+        *                          This can be used to make split assignments 
locality-aware.
         */
-       void handleSourceEvent(int subtaskId, SourceEvent sourceEvent);
+       void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname);
 
        /**
         * Add a split back to the split enumerator. It will only happen when a 
{@link SourceReader} fails
@@ -87,4 +91,18 @@ public interface SplitEnumerator<SplitT extends SourceSplit, 
CheckpointT>
         */
        @Override
        default void notifyCheckpointComplete(long checkpointId) throws 
Exception {}
+
+       /**
+        * Handles a custom source event from the source reader.
+        *
+        * <p>This method has a default implementation that does nothing, 
because it is only
+        * required to be implemented by some sources, which have a custom 
event protocol between
+        * reader and enumerator. The common events for reader registration and 
split requests
+        * are not dispatched to this method, but rather invoke the {@link 
#addReader(int)} and
+        * {@link #handleSplitRequest(int, String)} methods.
+        *
+        * @param subtaskId the subtask id of the source reader who sent the 
source event.
+        * @param sourceEvent the source event from the source reader.
+        */
+       default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) 
{}
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
index 61e806d..0a4efe3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.api.connector.source.lib.util;
 
-import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.event.RequestSplitEvent;
-import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayDeque;
 import java.util.Collection;
@@ -56,11 +55,7 @@ public class IteratorSourceEnumerator<SplitT extends 
IteratorSourceSplit<?, ?>>
        public void close() {}
 
        @Override
-       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
-               if (!(sourceEvent instanceof RequestSplitEvent)) {
-                       throw new FlinkRuntimeException("Unrecognized event: " 
+ sourceEvent);
-               }
-
+       public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
                final SplitT nextSplit = remainingSplits.poll();
                if (nextSplit != null) {
                        context.assignSplit(nextSplit, subtaskId);
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
index 5c6631e..92ab054 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.connector.source.lib.util;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.core.io.InputStatus;
 
 import javax.annotation.Nullable;
@@ -83,7 +82,7 @@ public class IteratorSourceReader<E, IterT extends 
Iterator<E>, SplitT extends I
        public void start() {
                // request a split only if we did not get one during restore
                if (iterator == null) {
-                       context.sendSourceEventToCoordinator(new 
RequestSplitEvent());
+                       context.sendSplitRequest();
                }
        }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
index 0525533..bc034b0 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -71,6 +73,9 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Set
        }
 
        @Override
+       public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {}
+
+       @Override
        public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
                handledSourceEvent.add(sourceEvent);
        }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
index 3f254e4..98aad54 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
@@ -69,6 +69,9 @@ public class TestingReaderContext implements 
SourceReaderContext {
        }
 
        @Override
+       public void sendSplitRequest() {}
+
+       @Override
        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
                sentEvents.add(sourceEvent);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index ea56230..10baf02 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -30,7 +30,9 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.RequestSplitEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,10 +134,14 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT> implements
                coordinatorExecutor.execute(() -> {
                        try {
                                LOG.debug("Handling event from subtask {} of 
source {}: {}", subtask, operatorName, event);
-                               if (event instanceof SourceEventWrapper) {
+                               if (event instanceof RequestSplitEvent) {
+                                       enumerator.handleSplitRequest(subtask, 
((RequestSplitEvent) event).hostName());
+                               } else if (event instanceof SourceEventWrapper) 
{
                                        enumerator.handleSourceEvent(subtask, 
((SourceEventWrapper) event).getSourceEvent());
                                } else if (event instanceof 
ReaderRegistrationEvent) {
                                        
handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+                               } else {
+                                       throw new FlinkException("Unrecognized 
Operator Event: " + event);
                                }
                        } catch (Exception e) {
                                LOG.error("Failing the job due to exception 
when handling operator event {} from subtask {} " +
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/RequestSplitEvent.java
similarity index 92%
rename from 
flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/RequestSplitEvent.java
index 9ff2293..6e7166b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/RequestSplitEvent.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.connector.source.event;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 import javax.annotation.Nullable;
 
@@ -30,7 +30,7 @@ import java.util.Objects;
  * <p>This event optionally carries the hostname of the location where the 
reader runs, to support
  * locality-aware work assignment.
  */
-public final class RequestSplitEvent implements SourceEvent {
+public final class RequestSplitEvent implements OperatorEvent {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 80043cf..de3358b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.RequestSplitEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -165,6 +166,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                        }
 
                        @Override
+                       public void sendSplitRequest() {
+                               operatorEventGateway.sendEventToCoordinator(new 
RequestSplitEvent(getLocalHostName()));
+                       }
+
+                       @Override
                        public void sendSourceEventToCoordinator(SourceEvent 
event) {
                                operatorEventGateway.sendEventToCoordinator(new 
SourceEventWrapper(event));
                        }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index f2a9fa9..5a036db 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -35,7 +35,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
@@ -71,6 +70,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -367,8 +367,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
                        }
 
                        @Override
-                       public void handleSourceEvent(int subtaskId, 
SourceEvent sourceEvent) {
-                       }
+                       public void handleSplitRequest(int subtaskId, @Nullable 
String requesterHostname) {}
 
                        @Override
                        public void addSplitsBack(List<LongSplit> splits, int 
subtaskId) {

Reply via email to