This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit bc96de64929767b580f5e925bec3849485de4bfa Author: Stephan Ewen <[email protected]> AuthorDate: Mon Jun 29 16:58:00 2020 +0200 [hotfix][core] Add to Source Enumerator convenience methods to assign single split --- .../flink/api/connector/source/SplitEnumeratorContext.java | 13 +++++++++++++ .../apache/flink/api/connector/source/SplitsAssignment.java | 8 ++++++++ 2 files changed, 21 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java index 5aee6dd..8ec8618 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java @@ -71,6 +71,19 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> { void assignSplits(SplitsAssignment<SplitT> newSplitAssignments); /** + * Assigns a single split. + * + * <p>When assigning multiple splits, it is more efficient to assign all of them in a single + * call to the {@link #assignSplits(SplitsAssignment)} method. + * + * @param split The new split + * @param subtask The index of the operator's parallel subtask that shall receive the split. + */ + default void assignSplit(SplitT split, int subtask) { + assignSplits(new SplitsAssignment<>(split, subtask)); + } + + /** * Invoke the callable and handover the return value to the handler which will be executed * by the source coordinator. When this method is invoked multiple times, The <code>Coallble</code>s * may be executed in a thread pool concurrently. diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java index 6331788..5c08922 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java @@ -20,6 +20,8 @@ package org.apache.flink.api.connector.source; import org.apache.flink.annotation.PublicEvolving; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,12 +33,18 @@ import java.util.Map; */ @PublicEvolving public final class SplitsAssignment<SplitT extends SourceSplit> { + private final Map<Integer, List<SplitT>> assignment; public SplitsAssignment(Map<Integer, List<SplitT>> assignment) { this.assignment = assignment; } + public SplitsAssignment(SplitT split, int subtask) { + this.assignment = new HashMap<>(); + this.assignment.put(subtask, Collections.singletonList(split)); + } + /** * @return A mapping from subtask ID to their split assignment. */
