This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc96de64929767b580f5e925bec3849485de4bfa
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Jun 29 16:58:00 2020 +0200

    [hotfix][core] Add to Source Enumerator convenience methods to assign 
single split
---
 .../flink/api/connector/source/SplitEnumeratorContext.java  | 13 +++++++++++++
 .../apache/flink/api/connector/source/SplitsAssignment.java |  8 ++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 5aee6dd..8ec8618 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -71,6 +71,19 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
        void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);
 
        /**
+        * Assigns a single split.
+        *
+        * <p>When assigning multiple splits, it is more efficient to assign 
all of them in a single
+        * call to the {@link #assignSplits(SplitsAssignment)} method.
+        *
+        * @param split The new split
+        * @param subtask The index of the operator's parallel subtask that 
shall receive the split.
+        */
+       default void assignSplit(SplitT split, int subtask) {
+               assignSplits(new SplitsAssignment<>(split, subtask));
+       }
+
+       /**
         * Invoke the callable and handover the return value to the handler 
which will be executed
         * by the source coordinator. When this method is invoked multiple 
times, The <code>Coallble</code>s
         * may be executed in a thread pool concurrently.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
index 6331788..5c08922 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -31,12 +33,18 @@ import java.util.Map;
  */
 @PublicEvolving
 public final class SplitsAssignment<SplitT extends SourceSplit> {
+
        private final Map<Integer, List<SplitT>> assignment;
 
        public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
                this.assignment = assignment;
        }
 
+       public SplitsAssignment(SplitT split, int subtask) {
+               this.assignment = new HashMap<>();
+               this.assignment.put(subtask, Collections.singletonList(split));
+       }
+
        /**
         * @return A mapping from subtask ID to their split assignment.
         */

Reply via email to