AHeise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r646563518



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configured source 
chain. */
+@PublicEvolving
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumeratorState> {
+
+    private final SourceChain<T, ?> sourceChain;
+
+    public HybridSource(SourceChain<T, ?> sourceChain) {

Review comment:
       We should a builder pattern as with the other sources. Then you could 
promote the inner `SourceChain` class to a builder and pass here the list of 
sources.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configured source 
chain. */
+@PublicEvolving
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumeratorState> {

Review comment:
       We could rename the thing to `ConcatenatedSource(s)`, `SourceSequence`, 
or similar (also going to post that to ML thread).
   Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and 
does not carry the concatentation concept as well (hybrid sounds to me more 
like the source would constantly switch back and forth).

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configured source 
chain. */
+@PublicEvolving
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumeratorState> {
+
+    private final SourceChain<T, ?> sourceChain;
+
+    public HybridSource(SourceChain<T, ?> sourceChain) {
+        Preconditions.checkArgument(!sourceChain.sources.isEmpty());
+        for (int i = 0; i < sourceChain.sources.size() - 1; i++) {
+            Preconditions.checkArgument(
+                    
Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()),
+                    "All sources except the final source need to be bounded.");
+        }
+        this.sourceChain = sourceChain;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return sourceChain.sources.get(sourceChain.sources.size() - 
1).f0.getBoundedness();
+    }
+
+    @Override
+    public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        List<SourceReader<T, ? extends SourceSplit>> readers = new 
ArrayList<>();
+        for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> source : 
sourceChain.sources) {
+            readers.add(source.f0.createReader(readerContext));
+        }
+        return new HybridSourceReader(readerContext, readers);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> 
createEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext) {
+        return new HybridSourceSplitEnumerator(enumContext, sourceChain, 0);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> 
restoreEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext,
+            HybridSourceEnumeratorState checkpoint)
+            throws Exception {
+        return new HybridSourceSplitEnumerator(
+                enumContext, sourceChain, checkpoint.getCurrentSourceIndex());
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() {
+        List<SimpleVersionedSerializer<SourceSplit>> serializers = new 
ArrayList<>();
+        sourceChain.sources.forEach(
+                t -> 
serializers.add(castSerializer(t.f0.getSplitSerializer())));
+        return new HybridSourceSplitSerializer(serializers);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceEnumeratorState>
+            getEnumeratorCheckpointSerializer() {
+        List<SimpleVersionedSerializer<Object>> serializers = new 
ArrayList<>();
+        sourceChain.sources.forEach(
+                t -> 
serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer())));
+        return new HybridSourceEnumeratorStateSerializer(serializers);
+    }
+
+    private static <T> SimpleVersionedSerializer<T> castSerializer(
+            SimpleVersionedSerializer<? extends T> s) {
+        @SuppressWarnings("rawtypes")
+        SimpleVersionedSerializer s1 = s;
+        return s1;
+    }
+
+    /**
+     * Converts checkpoint between sources to transfer end position to next 
source's start position.
+     * Only required for dynamic position transfer at time of switching, 
otherwise source can be
+     * preconfigured with a start position during job submission.
+     */
+    public interface CheckpointConverter<InCheckpointT, OutCheckpointT>

Review comment:
       Should this converter also convert in the other direction in case of a 
failure?
   
   Let's say you have a dynamic handover 1h before NOW to Kafka from File 
because Kafka has only 2h retention. 
   You just switched to file, have a checkpoint, and then the failure occurs. 
   Now your recovery takes longer than 1h and you have to go back to the file 
from the offset from Kafka.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configured source 
chain. */
+@PublicEvolving
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumeratorState> {
+
+    private final SourceChain<T, ?> sourceChain;
+
+    public HybridSource(SourceChain<T, ?> sourceChain) {
+        Preconditions.checkArgument(!sourceChain.sources.isEmpty());
+        for (int i = 0; i < sourceChain.sources.size() - 1; i++) {
+            Preconditions.checkArgument(
+                    
Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()),
+                    "All sources except the final source need to be bounded.");
+        }
+        this.sourceChain = sourceChain;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return sourceChain.sources.get(sourceChain.sources.size() - 
1).f0.getBoundedness();
+    }
+
+    @Override
+    public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        List<SourceReader<T, ? extends SourceSplit>> readers = new 
ArrayList<>();
+        for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> source : 
sourceChain.sources) {
+            readers.add(source.f0.createReader(readerContext));
+        }
+        return new HybridSourceReader(readerContext, readers);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> 
createEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext) {
+        return new HybridSourceSplitEnumerator(enumContext, sourceChain, 0);
+    }
+
+    @Override
+    public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> 
restoreEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> enumContext,
+            HybridSourceEnumeratorState checkpoint)
+            throws Exception {
+        return new HybridSourceSplitEnumerator(
+                enumContext, sourceChain, checkpoint.getCurrentSourceIndex());
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() {
+        List<SimpleVersionedSerializer<SourceSplit>> serializers = new 
ArrayList<>();
+        sourceChain.sources.forEach(
+                t -> 
serializers.add(castSerializer(t.f0.getSplitSerializer())));
+        return new HybridSourceSplitSerializer(serializers);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<HybridSourceEnumeratorState>
+            getEnumeratorCheckpointSerializer() {
+        List<SimpleVersionedSerializer<Object>> serializers = new 
ArrayList<>();
+        sourceChain.sources.forEach(
+                t -> 
serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer())));
+        return new HybridSourceEnumeratorStateSerializer(serializers);
+    }
+
+    private static <T> SimpleVersionedSerializer<T> castSerializer(
+            SimpleVersionedSerializer<? extends T> s) {
+        @SuppressWarnings("rawtypes")
+        SimpleVersionedSerializer s1 = s;
+        return s1;
+    }
+
+    /**
+     * Converts checkpoint between sources to transfer end position to next 
source's start position.
+     * Only required for dynamic position transfer at time of switching, 
otherwise source can be
+     * preconfigured with a start position during job submission.
+     */
+    public interface CheckpointConverter<InCheckpointT, OutCheckpointT>
+            extends Function<InCheckpointT, OutCheckpointT>, Serializable {}
+
+    /** Chain of sources with option to convert start position at switch-time. 
*/
+    public static class SourceChain<T, EnumChkT> implements Serializable {
+        final List<Tuple2<Source<T, ? extends SourceSplit, ?>, 
CheckpointConverter<?, ?>>> sources;

Review comment:
       In the non-poc, we should replace Tuple2 with some inner record type.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * <p>This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * <p>Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * <p>During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator<SplitT extends SourceSplit>
+        implements SplitEnumerator<HybridSourceSplit<SplitT>, 
HybridSourceEnumeratorState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<HybridSourceSplit> context;
+    private final HybridSource.SourceChain<?, SplitT, Object> sourceChain;
+    // TODO: SourceCoordinatorContext does not provide access to current 
assignments
+    private final Map<Integer, List<HybridSourceSplit<SplitT>>> assignments;
+    private final Map<Integer, TreeMap<Integer, 
List<HybridSourceSplit<SplitT>>>> pendingSplits;
+    private final HashSet<Integer> pendingReaders;
+    private int currentSourceIndex;
+    private SplitEnumerator<SplitT, Object> currentEnumerator;
+
+    public HybridSourceSplitEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            HybridSource.SourceChain<?, SplitT, Object> sourceChain) {
+        this(context, sourceChain, 0);
+    }
+
+    public HybridSourceSplitEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            HybridSource.SourceChain<?, SplitT, Object> sourceChain,
+            int initialSourceIndex) {
+        Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+        this.context = context;
+        this.sourceChain = sourceChain;
+        this.currentSourceIndex = initialSourceIndex;
+        this.assignments = new HashMap<>();
+        this.pendingSplits = new HashMap<>();
+        this.pendingReaders = new HashSet<>();
+    }
+
+    @Override
+    public void start() {
+        switchEnumerator();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, String requesterHostname) {
+        LOG.debug(
+                "handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+                subtaskId,
+                currentSourceIndex,
+                pendingSplits);
+        // TODO: test coverage for on demand split assignment
+        assignPendingSplits(subtaskId);
+        currentEnumerator.handleSplitRequest(subtaskId, requesterHostname);
+    }
+
+    @Override
+    public void addSplitsBack(List<HybridSourceSplit<SplitT>> splits, int 
subtaskId) {
+        LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits);
+        // Splits returned can belong to multiple sources, after switching 
since last checkpoint
+        TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySourceIndex 
= new TreeMap<>();
+
+        for (HybridSourceSplit<SplitT> split : splits) {
+            splitsBySourceIndex
+                    .computeIfAbsent(split.sourceIndex(), k -> new 
ArrayList<>())
+                    .add(split);
+        }
+
+        splitsBySourceIndex.forEach(
+                (k, splitsPerSource) -> {
+                    if (k == currentSourceIndex) {
+                        currentEnumerator.addSplitsBack(
+                                
HybridSourceReader.unwrapSplits(splitsPerSource), subtaskId);
+                    } else {
+                        pendingSplits
+                                .computeIfAbsent(subtaskId, sourceIndex -> new 
TreeMap<>())
+                                .put(k, splitsPerSource);
+                        if 
(context.registeredReaders().containsKey(subtaskId)) {
+                            assignPendingSplits(subtaskId);
+                        }
+                    }
+                });
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        LOG.debug("addReader subtaskId={}", subtaskId);
+        if (pendingSplits.isEmpty()) {
+            context.sendEventToSourceReader(subtaskId, new 
SwitchSourceEvent(currentSourceIndex));
+            LOG.debug("Adding reader {} to enumerator {}", subtaskId, 
currentSourceIndex);
+            currentEnumerator.addReader(subtaskId);
+        } else {
+            // Defer adding reader to the current enumerator until splits 
belonging to earlier
+            // enumerators that were added back have been processed
+            pendingReaders.add(subtaskId);
+            assignPendingSplits(subtaskId);
+        }
+    }
+
+    private void assignPendingSplits(int subtaskId) {
+        TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySource =
+                pendingSplits.get(subtaskId);
+        if (splitsBySource != null) {
+            int sourceIndex = splitsBySource.firstKey();
+            List<HybridSourceSplit<SplitT>> splits =
+                    
Preconditions.checkNotNull(splitsBySource.get(sourceIndex));
+            Preconditions.checkState(!splits.isEmpty());
+            LOG.debug("Assigning pending splits subtask={} {}", subtaskId, 
splits);
+            context.sendEventToSourceReader(subtaskId, new 
SwitchSourceEvent(sourceIndex));
+            context.assignSplits(
+                    new SplitsAssignment<HybridSourceSplit>(
+                            Collections.singletonMap(subtaskId, (List) 
splits)));
+            context.signalNoMoreSplits(subtaskId);
+            // Empty collection indicates that splits have been assigned
+            splits.clear();
+        }
+    }
+
+    @Override
+    public HybridSourceEnumeratorState snapshotState(long checkpointId) throws 
Exception {
+        Object enumState = currentEnumerator.snapshotState(checkpointId);
+        return new HybridSourceEnumeratorState(currentSourceIndex, enumState);
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SourceReaderFinishedEvent) {
+            SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent) 
sourceEvent;
+            if (srfe.sourceIndex() != currentSourceIndex) {
+                if (srfe.sourceIndex() < currentSourceIndex) {
+                    // Assign pending splits if any
+                    TreeMap<Integer, List<HybridSourceSplit<SplitT>>> 
splitsBySource =
+                            pendingSplits.get(subtaskId);
+                    if (splitsBySource != null) {
+                        List<HybridSourceSplit<SplitT>> splits =
+                                splitsBySource.get(srfe.sourceIndex());
+                        if (splits != null && splits.isEmpty()) {
+                            // Splits have been processed by the reader
+                            splitsBySource.remove(srfe.sourceIndex());
+                        }
+                        if (splitsBySource.isEmpty()) {
+                            pendingSplits.remove(subtaskId);
+                        } else {
+                            Integer nextSubtaskSourceIndex = 
splitsBySource.firstKey();
+                            LOG.debug(
+                                    "Restore subtask={}, sourceIndex={}",
+                                    subtaskId,
+                                    nextSubtaskSourceIndex);
+                            context.sendEventToSourceReader(
+                                    subtaskId, new 
SwitchSourceEvent(nextSubtaskSourceIndex));
+                            assignPendingSplits(subtaskId);
+                        }
+                    }
+                    // Once all pending splits have been processed, add the 
readers to the current
+                    // enumerator, which may in turn trigger new split 
assignments
+                    if (!pendingReaders.isEmpty() && pendingSplits.isEmpty()) {
+                        // Advance pending readers to current enumerator
+                        LOG.debug(
+                                "Adding pending readers {} to enumerator 
currentSourceIndex={}",
+                                pendingReaders,
+                                currentSourceIndex);
+                        for (int pendingReaderSubtaskId : pendingReaders) {
+                            context.sendEventToSourceReader(
+                                    pendingReaderSubtaskId,
+                                    new SwitchSourceEvent(currentSourceIndex));
+                        }
+                        for (int pendingReaderSubtaskId : pendingReaders) {
+                            
currentEnumerator.addReader(pendingReaderSubtaskId);
+                        }
+                        pendingReaders.clear();
+                    }
+                } else {
+                    // enumerator already switched
+                    LOG.debug("Ignoring out of order event {}", srfe);
+                }
+                return;
+            }

Review comment:
       For such a long method, I'd rather prefer more `return` to keep the 
nesting down. Or simply extract methods.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state. 
*/
+public class HybridSourceEnumeratorStateSerializer

Review comment:
       Do we need to store, which sources are currently in the hybrid source? 
What happens if a user adds/removes a source before recovering?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * <p>This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * <p>When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader<T> implements SourceReader<T, 
HybridSourceSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+    private final SourceReaderContext readerContext;
+    private final List<SourceReader<T, ? extends SourceSplit>> chainedReaders;
+    private int currentSourceIndex = -1;
+    private SourceReader<T, ? extends SourceSplit> currentReader;
+    private CompletableFuture<Void> availabilityFuture;
+
+    public HybridSourceReader(
+            SourceReaderContext readerContext,
+            List<SourceReader<T, ? extends SourceSplit>> readers) {
+        this.readerContext = readerContext;
+        this.chainedReaders = readers;
+    }
+
+    @Override
+    public void start() {
+        setCurrentReader(0);

Review comment:
       Is it intended that we always start at 0 even after HA failover? Do we 
quickly go over already processed sub-sources then?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * <p>This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * <p>Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * <p>During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. These splits may originate from a 
previous enumerator that
+ * is no longer active. In that case {@link HybridSourceSplitEnumerator} will 
suspend forwarding to
+ * the current enumerator and replay the returned splits by activating the 
previous readers. After
+ * returned splits were processed, delegation to the current underlying 
enumerator resumes.
+ */
+public class HybridSourceSplitEnumerator
+        implements SplitEnumerator<HybridSourceSplit, 
HybridSourceEnumeratorState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<HybridSourceSplit> context;
+    private final HybridSource.SourceChain<?, ?> sourceChain;
+    // TODO: SourceCoordinatorContext does not provide access to current 
assignments
+    private final Map<Integer, List<HybridSourceSplit>> assignments;
+    // Splits that have been returned due to subtask reset
+    private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> 
pendingSplits;
+    private final HashSet<Integer> pendingReaders;
+    private int currentSourceIndex;
+    private SplitEnumerator<SourceSplit, Object> currentEnumerator;
+
+    public HybridSourceSplitEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            HybridSource.SourceChain<?, ?> sourceChain,
+            int initialSourceIndex) {
+        Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+        this.context = context;
+        this.sourceChain = sourceChain;
+        this.currentSourceIndex = initialSourceIndex;
+        this.assignments = new HashMap<>();
+        this.pendingSplits = new HashMap<>();
+        this.pendingReaders = new HashSet<>();
+    }
+
+    @Override
+    public void start() {
+        switchEnumerator();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, String requesterHostname) {
+        LOG.debug(
+                "handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+                subtaskId,
+                currentSourceIndex,
+                pendingSplits);
+        // TODO: test coverage for on demand split assignment
+        assignPendingSplits(subtaskId);
+        currentEnumerator.handleSplitRequest(subtaskId, requesterHostname);
+    }
+
+    @Override
+    public void addSplitsBack(List<HybridSourceSplit> splits, int subtaskId) {
+        LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits);
+        // Splits returned can belong to multiple sources, after switching 
since last checkpoint
+        TreeMap<Integer, List<HybridSourceSplit>> splitsBySourceIndex = new 
TreeMap<>();

Review comment:
       Aren't we polling always the splits from the first source? Could we use 
some kind of `PriorityQueue`?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * <p>This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * <p>When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader<T> implements SourceReader<T, 
HybridSourceSplit> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+    private final SourceReaderContext readerContext;
+    private final List<SourceReader<T, ? extends SourceSplit>> chainedReaders;
+    private int currentSourceIndex = -1;
+    private SourceReader<T, ? extends SourceSplit> currentReader;
+    private CompletableFuture<Void> availabilityFuture;
+
+    public HybridSourceReader(
+            SourceReaderContext readerContext,
+            List<SourceReader<T, ? extends SourceSplit>> readers) {
+        this.readerContext = readerContext;
+        this.chainedReaders = readers;
+    }
+
+    @Override
+    public void start() {
+        setCurrentReader(0);
+    }
+
+    @Override
+    public InputStatus pollNext(ReaderOutput output) throws Exception {
+        InputStatus status = currentReader.pollNext(output);
+        if (status == InputStatus.END_OF_INPUT) {
+            // trap END_OF_INPUT if this wasn't the final reader
+            LOG.info(
+                    "End of input subtask={} sourceIndex={} {}",
+                    readerContext.getIndexOfSubtask(),
+                    currentSourceIndex,
+                    currentReader);
+            if (currentSourceIndex + 1 < chainedReaders.size()) {
+                // Signal the coordinator that the current reader has consumed 
all input and the
+                // next source can potentially be activated (after all readers 
are ready).
+                readerContext.sendSourceEventToCoordinator(
+                        new SourceReaderFinishedEvent(currentSourceIndex));
+                // More data will be available from the next reader.
+                // InputStatus.NOTHING_AVAILABLE requires us to complete the 
availability
+                // future after source switch to resume poll.
+                return InputStatus.NOTHING_AVAILABLE;
+            }
+        }
+        return status;
+    }
+
+    @Override
+    public List<HybridSourceSplit> snapshotState(long checkpointId) {
+        List<? extends SourceSplit> state = 
currentReader.snapshotState(checkpointId);

Review comment:
       Wouldn't we also need to snapshot past readers? What happens with 
sources that commit their offsets back? When would their final offset appear?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * <p>This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * <p>Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * <p>During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. These splits may originate from a 
previous enumerator that
+ * is no longer active. In that case {@link HybridSourceSplitEnumerator} will 
suspend forwarding to
+ * the current enumerator and replay the returned splits by activating the 
previous readers. After
+ * returned splits were processed, delegation to the current underlying 
enumerator resumes.
+ */
+public class HybridSourceSplitEnumerator
+        implements SplitEnumerator<HybridSourceSplit, 
HybridSourceEnumeratorState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<HybridSourceSplit> context;
+    private final HybridSource.SourceChain<?, ?> sourceChain;
+    // TODO: SourceCoordinatorContext does not provide access to current 
assignments
+    private final Map<Integer, List<HybridSourceSplit>> assignments;
+    // Splits that have been returned due to subtask reset
+    private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> 
pendingSplits;
+    private final HashSet<Integer> pendingReaders;
+    private int currentSourceIndex;
+    private SplitEnumerator<SourceSplit, Object> currentEnumerator;
+
+    public HybridSourceSplitEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            HybridSource.SourceChain<?, ?> sourceChain,
+            int initialSourceIndex) {
+        Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+        this.context = context;
+        this.sourceChain = sourceChain;
+        this.currentSourceIndex = initialSourceIndex;
+        this.assignments = new HashMap<>();
+        this.pendingSplits = new HashMap<>();
+        this.pendingReaders = new HashSet<>();
+    }
+
+    @Override
+    public void start() {
+        switchEnumerator();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, String requesterHostname) {
+        LOG.debug(
+                "handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+                subtaskId,
+                currentSourceIndex,
+                pendingSplits);
+        // TODO: test coverage for on demand split assignment
+        assignPendingSplits(subtaskId);
+        currentEnumerator.handleSplitRequest(subtaskId, requesterHostname);
+    }
+
+    @Override
+    public void addSplitsBack(List<HybridSourceSplit> splits, int subtaskId) {
+        LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits);
+        // Splits returned can belong to multiple sources, after switching 
since last checkpoint
+        TreeMap<Integer, List<HybridSourceSplit>> splitsBySourceIndex = new 
TreeMap<>();
+
+        for (HybridSourceSplit split : splits) {
+            splitsBySourceIndex
+                    .computeIfAbsent(split.sourceIndex(), k -> new 
ArrayList<>())
+                    .add(split);
+        }
+
+        splitsBySourceIndex.forEach(
+                (k, splitsPerSource) -> {
+                    if (k == currentSourceIndex) {
+                        currentEnumerator.addSplitsBack(
+                                
HybridSourceSplit.unwrapSplits(splitsPerSource), subtaskId);
+                    } else {
+                        pendingSplits
+                                .computeIfAbsent(subtaskId, sourceIndex -> new 
TreeMap<>())
+                                .put(k, splitsPerSource);
+                        if 
(context.registeredReaders().containsKey(subtaskId)) {
+                            assignPendingSplits(subtaskId);
+                        }
+                    }
+                });
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        LOG.debug("addReader subtaskId={}", subtaskId);
+        if (pendingSplits.isEmpty()) {
+            context.sendEventToSourceReader(subtaskId, new 
SwitchSourceEvent(currentSourceIndex));
+            LOG.debug("Adding reader {} to enumerator {}", subtaskId, 
currentSourceIndex);
+            currentEnumerator.addReader(subtaskId);
+        } else {
+            // Defer adding reader to the current enumerator until splits 
belonging to earlier
+            // enumerators that were added back have been processed
+            pendingReaders.add(subtaskId);
+            assignPendingSplits(subtaskId);
+        }

Review comment:
       These cases are not fully clear to me. 
   The first is triggered on start and the second on regional failover? Would 
you please be so kind and add some comment?

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * <p>This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * <p>Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * <p>During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator<SplitT extends SourceSplit>
+        implements SplitEnumerator<HybridSourceSplit<SplitT>, 
HybridSourceEnumeratorState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<HybridSourceSplit> context;
+    private final HybridSource.SourceChain<?, SplitT, Object> sourceChain;
+    // TODO: SourceCoordinatorContext does not provide access to current 
assignments
+    private final Map<Integer, List<HybridSourceSplit<SplitT>>> assignments;
+    private final Map<Integer, TreeMap<Integer, 
List<HybridSourceSplit<SplitT>>>> pendingSplits;
+    private final HashSet<Integer> pendingReaders;
+    private int currentSourceIndex;
+    private SplitEnumerator<SplitT, Object> currentEnumerator;
+
+    public HybridSourceSplitEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            HybridSource.SourceChain<?, SplitT, Object> sourceChain) {
+        this(context, sourceChain, 0);
+    }
+
+    public HybridSourceSplitEnumerator(
+            SplitEnumeratorContext<HybridSourceSplit> context,
+            HybridSource.SourceChain<?, SplitT, Object> sourceChain,
+            int initialSourceIndex) {
+        Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+        this.context = context;
+        this.sourceChain = sourceChain;
+        this.currentSourceIndex = initialSourceIndex;
+        this.assignments = new HashMap<>();
+        this.pendingSplits = new HashMap<>();
+        this.pendingReaders = new HashSet<>();
+    }
+
+    @Override
+    public void start() {
+        switchEnumerator();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, String requesterHostname) {
+        LOG.debug(
+                "handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+                subtaskId,
+                currentSourceIndex,
+                pendingSplits);
+        // TODO: test coverage for on demand split assignment
+        assignPendingSplits(subtaskId);
+        currentEnumerator.handleSplitRequest(subtaskId, requesterHostname);
+    }
+
+    @Override
+    public void addSplitsBack(List<HybridSourceSplit<SplitT>> splits, int 
subtaskId) {
+        LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits);
+        // Splits returned can belong to multiple sources, after switching 
since last checkpoint
+        TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySourceIndex 
= new TreeMap<>();
+
+        for (HybridSourceSplit<SplitT> split : splits) {
+            splitsBySourceIndex
+                    .computeIfAbsent(split.sourceIndex(), k -> new 
ArrayList<>())
+                    .add(split);
+        }
+
+        splitsBySourceIndex.forEach(
+                (k, splitsPerSource) -> {
+                    if (k == currentSourceIndex) {
+                        currentEnumerator.addSplitsBack(
+                                
HybridSourceReader.unwrapSplits(splitsPerSource), subtaskId);
+                    } else {
+                        pendingSplits
+                                .computeIfAbsent(subtaskId, sourceIndex -> new 
TreeMap<>())
+                                .put(k, splitsPerSource);
+                        if 
(context.registeredReaders().containsKey(subtaskId)) {
+                            assignPendingSplits(subtaskId);
+                        }
+                    }
+                });
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        LOG.debug("addReader subtaskId={}", subtaskId);
+        if (pendingSplits.isEmpty()) {
+            context.sendEventToSourceReader(subtaskId, new 
SwitchSourceEvent(currentSourceIndex));
+            LOG.debug("Adding reader {} to enumerator {}", subtaskId, 
currentSourceIndex);
+            currentEnumerator.addReader(subtaskId);
+        } else {
+            // Defer adding reader to the current enumerator until splits 
belonging to earlier
+            // enumerators that were added back have been processed
+            pendingReaders.add(subtaskId);
+            assignPendingSplits(subtaskId);
+        }
+    }
+
+    private void assignPendingSplits(int subtaskId) {
+        TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySource =
+                pendingSplits.get(subtaskId);
+        if (splitsBySource != null) {
+            int sourceIndex = splitsBySource.firstKey();
+            List<HybridSourceSplit<SplitT>> splits =
+                    
Preconditions.checkNotNull(splitsBySource.get(sourceIndex));
+            Preconditions.checkState(!splits.isEmpty());
+            LOG.debug("Assigning pending splits subtask={} {}", subtaskId, 
splits);
+            context.sendEventToSourceReader(subtaskId, new 
SwitchSourceEvent(sourceIndex));
+            context.assignSplits(
+                    new SplitsAssignment<HybridSourceSplit>(
+                            Collections.singletonMap(subtaskId, (List) 
splits)));
+            context.signalNoMoreSplits(subtaskId);
+            // Empty collection indicates that splits have been assigned
+            splits.clear();
+        }
+    }
+
+    @Override
+    public HybridSourceEnumeratorState snapshotState(long checkpointId) throws 
Exception {
+        Object enumState = currentEnumerator.snapshotState(checkpointId);
+        return new HybridSourceEnumeratorState(currentSourceIndex, enumState);
+    }
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof SourceReaderFinishedEvent) {
+            SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent) 
sourceEvent;
+            if (srfe.sourceIndex() != currentSourceIndex) {
+                if (srfe.sourceIndex() < currentSourceIndex) {
+                    // Assign pending splits if any
+                    TreeMap<Integer, List<HybridSourceSplit<SplitT>>> 
splitsBySource =
+                            pendingSplits.get(subtaskId);
+                    if (splitsBySource != null) {
+                        List<HybridSourceSplit<SplitT>> splits =
+                                splitsBySource.get(srfe.sourceIndex());
+                        if (splits != null && splits.isEmpty()) {
+                            // Splits have been processed by the reader
+                            splitsBySource.remove(srfe.sourceIndex());
+                        }
+                        if (splitsBySource.isEmpty()) {
+                            pendingSplits.remove(subtaskId);
+                        } else {
+                            Integer nextSubtaskSourceIndex = 
splitsBySource.firstKey();
+                            LOG.debug(
+                                    "Restore subtask={}, sourceIndex={}",
+                                    subtaskId,
+                                    nextSubtaskSourceIndex);
+                            context.sendEventToSourceReader(
+                                    subtaskId, new 
SwitchSourceEvent(nextSubtaskSourceIndex));
+                            assignPendingSplits(subtaskId);
+                        }
+                    }
+                    // Once all pending splits have been processed, add the 
readers to the current
+                    // enumerator, which may in turn trigger new split 
assignments
+                    if (!pendingReaders.isEmpty() && pendingSplits.isEmpty()) {
+                        // Advance pending readers to current enumerator
+                        LOG.debug(
+                                "Adding pending readers {} to enumerator 
currentSourceIndex={}",
+                                pendingReaders,
+                                currentSourceIndex);
+                        for (int pendingReaderSubtaskId : pendingReaders) {
+                            context.sendEventToSourceReader(
+                                    pendingReaderSubtaskId,
+                                    new SwitchSourceEvent(currentSourceIndex));
+                        }
+                        for (int pendingReaderSubtaskId : pendingReaders) {
+                            
currentEnumerator.addReader(pendingReaderSubtaskId);
+                        }
+                        pendingReaders.clear();
+                    }
+                } else {
+                    // enumerator already switched
+                    LOG.debug("Ignoring out of order event {}", srfe);
+                }
+                return;
+            }
+            this.assignments.remove(subtaskId);
+            LOG.info(
+                    "Reader finished for subtask {} remaining assignments {}",
+                    subtaskId,
+                    assignments);
+            if (this.assignments.isEmpty()) {
+                LOG.debug("No assignments remaining, ready to switch 
readers!");
+                if (currentSourceIndex + 1 < sourceChain.sources.size()) {
+                    switchEnumerator();
+                    // switch all readers prior to sending split assignments
+                    for (int i = 0; i < context.currentParallelism(); i++) {
+                        context.sendEventToSourceReader(
+                                i, new SwitchSourceEvent(currentSourceIndex));
+                    }
+                    // trigger split assignment,
+                    // (initially happens as part of subtask/reader 
registration)
+                    for (int i = 0; i < context.currentParallelism(); i++) {
+                        LOG.debug("adding reader subtask={} sourceIndex={}", 
i, currentSourceIndex);
+                        currentEnumerator.addReader(i);
+                    }
+                }
+            }
+        } else {
+            currentEnumerator.handleSourceEvent(subtaskId, sourceEvent);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        currentEnumerator.close();
+    }
+
+    private void switchEnumerator() {
+
+        Object enumeratorState = null;
+        if (currentEnumerator != null) {
+            try {
+                enumeratorState = currentEnumerator.snapshotState(-1);

Review comment:
       Ah I think this explains my question about the state of past readers. 
Makes sense now.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * <p>This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * <p>Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * <p>During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. These splits may originate from a 
previous enumerator that
+ * is no longer active. In that case {@link HybridSourceSplitEnumerator} will 
suspend forwarding to
+ * the current enumerator and replay the returned splits by activating the 
previous readers. After
+ * returned splits were processed, delegation to the current underlying 
enumerator resumes.
+ */
+public class HybridSourceSplitEnumerator
+        implements SplitEnumerator<HybridSourceSplit, 
HybridSourceEnumeratorState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<HybridSourceSplit> context;
+    private final HybridSource.SourceChain<?, ?> sourceChain;
+    // TODO: SourceCoordinatorContext does not provide access to current 
assignments
+    private final Map<Integer, List<HybridSourceSplit>> assignments;
+    // Splits that have been returned due to subtask reset
+    private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> 
pendingSplits;
+    private final HashSet<Integer> pendingReaders;

Review comment:
       For the non-poc, let's try to avoid specific implementations and use the 
respective interfaces in the declarations instead.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configured source 
chain. */
+@PublicEvolving
+public class HybridSource<T> implements Source<T, HybridSourceSplit, 
HybridSourceEnumeratorState> {
+
+    private final SourceChain<T, ?> sourceChain;
+
+    public HybridSource(SourceChain<T, ?> sourceChain) {
+        Preconditions.checkArgument(!sourceChain.sources.isEmpty());
+        for (int i = 0; i < sourceChain.sources.size() - 1; i++) {
+            Preconditions.checkArgument(
+                    
Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()),
+                    "All sources except the final source need to be bounded.");
+        }

Review comment:
       These checks should be done while the source chain is constructed / 
builder is invoked.

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * <p>This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * <p>Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * <p>During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. These splits may originate from a 
previous enumerator that
+ * is no longer active. In that case {@link HybridSourceSplitEnumerator} will 
suspend forwarding to
+ * the current enumerator and replay the returned splits by activating the 
previous readers. After
+ * returned splits were processed, delegation to the current underlying 
enumerator resumes.
+ */
+public class HybridSourceSplitEnumerator
+        implements SplitEnumerator<HybridSourceSplit, 
HybridSourceEnumeratorState> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+    private final SplitEnumeratorContext<HybridSourceSplit> context;
+    private final HybridSource.SourceChain<?, ?> sourceChain;

Review comment:
       If you go with builder pattern, this should be a 
`List<SourceWithConverter>` sources.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to