AHeise commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r646563518
########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configured source chain. */ +@PublicEvolving +public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> { + + private final SourceChain<T, ?> sourceChain; + + public HybridSource(SourceChain<T, ?> sourceChain) { Review comment: We should a builder pattern as with the other sources. Then you could promote the inner `SourceChain` class to a builder and pass here the list of sources. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configured source chain. */ +@PublicEvolving +public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> { Review comment: We could rename the thing to `ConcatenatedSource(s)`, `SourceSequence`, or similar (also going to post that to ML thread). Hybrid has the connotation of 2 for me (maybe because I'm a non-native) and does not carry the concatentation concept as well (hybrid sounds to me more like the source would constantly switch back and forth). ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configured source chain. */ +@PublicEvolving +public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> { + + private final SourceChain<T, ?> sourceChain; + + public HybridSource(SourceChain<T, ?> sourceChain) { + Preconditions.checkArgument(!sourceChain.sources.isEmpty()); + for (int i = 0; i < sourceChain.sources.size() - 1; i++) { + Preconditions.checkArgument( + Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()), + "All sources except the final source need to be bounded."); + } + this.sourceChain = sourceChain; + } + + @Override + public Boundedness getBoundedness() { + return sourceChain.sources.get(sourceChain.sources.size() - 1).f0.getBoundedness(); + } + + @Override + public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext readerContext) + throws Exception { + List<SourceReader<T, ? extends SourceSplit>> readers = new ArrayList<>(); + for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> source : sourceChain.sources) { + readers.add(source.f0.createReader(readerContext)); + } + return new HybridSourceReader(readerContext, readers); + } + + @Override + public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> createEnumerator( + SplitEnumeratorContext<HybridSourceSplit> enumContext) { + return new HybridSourceSplitEnumerator(enumContext, sourceChain, 0); + } + + @Override + public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> restoreEnumerator( + SplitEnumeratorContext<HybridSourceSplit> enumContext, + HybridSourceEnumeratorState checkpoint) + throws Exception { + return new HybridSourceSplitEnumerator( + enumContext, sourceChain, checkpoint.getCurrentSourceIndex()); + } + + @Override + public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() { + List<SimpleVersionedSerializer<SourceSplit>> serializers = new ArrayList<>(); + sourceChain.sources.forEach( + t -> serializers.add(castSerializer(t.f0.getSplitSerializer()))); + return new HybridSourceSplitSerializer(serializers); + } + + @Override + public SimpleVersionedSerializer<HybridSourceEnumeratorState> + getEnumeratorCheckpointSerializer() { + List<SimpleVersionedSerializer<Object>> serializers = new ArrayList<>(); + sourceChain.sources.forEach( + t -> serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer()))); + return new HybridSourceEnumeratorStateSerializer(serializers); + } + + private static <T> SimpleVersionedSerializer<T> castSerializer( + SimpleVersionedSerializer<? extends T> s) { + @SuppressWarnings("rawtypes") + SimpleVersionedSerializer s1 = s; + return s1; + } + + /** + * Converts checkpoint between sources to transfer end position to next source's start position. + * Only required for dynamic position transfer at time of switching, otherwise source can be + * preconfigured with a start position during job submission. + */ + public interface CheckpointConverter<InCheckpointT, OutCheckpointT> Review comment: Should this converter also convert in the other direction in case of a failure? Let's say you have a dynamic handover 1h before NOW to Kafka from File because Kafka has only 2h retention. You just switched to file, have a checkpoint, and then the failure occurs. Now your recovery takes longer than 1h and you have to go back to the file from the offset from Kafka. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configured source chain. */ +@PublicEvolving +public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> { + + private final SourceChain<T, ?> sourceChain; + + public HybridSource(SourceChain<T, ?> sourceChain) { + Preconditions.checkArgument(!sourceChain.sources.isEmpty()); + for (int i = 0; i < sourceChain.sources.size() - 1; i++) { + Preconditions.checkArgument( + Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()), + "All sources except the final source need to be bounded."); + } + this.sourceChain = sourceChain; + } + + @Override + public Boundedness getBoundedness() { + return sourceChain.sources.get(sourceChain.sources.size() - 1).f0.getBoundedness(); + } + + @Override + public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext readerContext) + throws Exception { + List<SourceReader<T, ? extends SourceSplit>> readers = new ArrayList<>(); + for (Tuple2<Source<T, ? extends SourceSplit, ?>, ?> source : sourceChain.sources) { + readers.add(source.f0.createReader(readerContext)); + } + return new HybridSourceReader(readerContext, readers); + } + + @Override + public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> createEnumerator( + SplitEnumeratorContext<HybridSourceSplit> enumContext) { + return new HybridSourceSplitEnumerator(enumContext, sourceChain, 0); + } + + @Override + public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> restoreEnumerator( + SplitEnumeratorContext<HybridSourceSplit> enumContext, + HybridSourceEnumeratorState checkpoint) + throws Exception { + return new HybridSourceSplitEnumerator( + enumContext, sourceChain, checkpoint.getCurrentSourceIndex()); + } + + @Override + public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() { + List<SimpleVersionedSerializer<SourceSplit>> serializers = new ArrayList<>(); + sourceChain.sources.forEach( + t -> serializers.add(castSerializer(t.f0.getSplitSerializer()))); + return new HybridSourceSplitSerializer(serializers); + } + + @Override + public SimpleVersionedSerializer<HybridSourceEnumeratorState> + getEnumeratorCheckpointSerializer() { + List<SimpleVersionedSerializer<Object>> serializers = new ArrayList<>(); + sourceChain.sources.forEach( + t -> serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer()))); + return new HybridSourceEnumeratorStateSerializer(serializers); + } + + private static <T> SimpleVersionedSerializer<T> castSerializer( + SimpleVersionedSerializer<? extends T> s) { + @SuppressWarnings("rawtypes") + SimpleVersionedSerializer s1 = s; + return s1; + } + + /** + * Converts checkpoint between sources to transfer end position to next source's start position. + * Only required for dynamic position transfer at time of switching, otherwise source can be + * preconfigured with a start position during job submission. + */ + public interface CheckpointConverter<InCheckpointT, OutCheckpointT> + extends Function<InCheckpointT, OutCheckpointT>, Serializable {} + + /** Chain of sources with option to convert start position at switch-time. */ + public static class SourceChain<T, EnumChkT> implements Serializable { + final List<Tuple2<Source<T, ? extends SourceSplit, ?>, CheckpointConverter<?, ?>>> sources; Review comment: In the non-poc, we should replace Tuple2 with some inner record type. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ########## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * <p>This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * <p>Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * <p>During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator<SplitT extends SourceSplit> + implements SplitEnumerator<HybridSourceSplit<SplitT>, HybridSourceEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + + private final SplitEnumeratorContext<HybridSourceSplit> context; + private final HybridSource.SourceChain<?, SplitT, Object> sourceChain; + // TODO: SourceCoordinatorContext does not provide access to current assignments + private final Map<Integer, List<HybridSourceSplit<SplitT>>> assignments; + private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit<SplitT>>>> pendingSplits; + private final HashSet<Integer> pendingReaders; + private int currentSourceIndex; + private SplitEnumerator<SplitT, Object> currentEnumerator; + + public HybridSourceSplitEnumerator( + SplitEnumeratorContext<HybridSourceSplit> context, + HybridSource.SourceChain<?, SplitT, Object> sourceChain) { + this(context, sourceChain, 0); + } + + public HybridSourceSplitEnumerator( + SplitEnumeratorContext<HybridSourceSplit> context, + HybridSource.SourceChain<?, SplitT, Object> sourceChain, + int initialSourceIndex) { + Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); + this.context = context; + this.sourceChain = sourceChain; + this.currentSourceIndex = initialSourceIndex; + this.assignments = new HashMap<>(); + this.pendingSplits = new HashMap<>(); + this.pendingReaders = new HashSet<>(); + } + + @Override + public void start() { + switchEnumerator(); + } + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) { + LOG.debug( + "handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", + subtaskId, + currentSourceIndex, + pendingSplits); + // TODO: test coverage for on demand split assignment + assignPendingSplits(subtaskId); + currentEnumerator.handleSplitRequest(subtaskId, requesterHostname); + } + + @Override + public void addSplitsBack(List<HybridSourceSplit<SplitT>> splits, int subtaskId) { + LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits); + // Splits returned can belong to multiple sources, after switching since last checkpoint + TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySourceIndex = new TreeMap<>(); + + for (HybridSourceSplit<SplitT> split : splits) { + splitsBySourceIndex + .computeIfAbsent(split.sourceIndex(), k -> new ArrayList<>()) + .add(split); + } + + splitsBySourceIndex.forEach( + (k, splitsPerSource) -> { + if (k == currentSourceIndex) { + currentEnumerator.addSplitsBack( + HybridSourceReader.unwrapSplits(splitsPerSource), subtaskId); + } else { + pendingSplits + .computeIfAbsent(subtaskId, sourceIndex -> new TreeMap<>()) + .put(k, splitsPerSource); + if (context.registeredReaders().containsKey(subtaskId)) { + assignPendingSplits(subtaskId); + } + } + }); + } + + @Override + public void addReader(int subtaskId) { + LOG.debug("addReader subtaskId={}", subtaskId); + if (pendingSplits.isEmpty()) { + context.sendEventToSourceReader(subtaskId, new SwitchSourceEvent(currentSourceIndex)); + LOG.debug("Adding reader {} to enumerator {}", subtaskId, currentSourceIndex); + currentEnumerator.addReader(subtaskId); + } else { + // Defer adding reader to the current enumerator until splits belonging to earlier + // enumerators that were added back have been processed + pendingReaders.add(subtaskId); + assignPendingSplits(subtaskId); + } + } + + private void assignPendingSplits(int subtaskId) { + TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySource = + pendingSplits.get(subtaskId); + if (splitsBySource != null) { + int sourceIndex = splitsBySource.firstKey(); + List<HybridSourceSplit<SplitT>> splits = + Preconditions.checkNotNull(splitsBySource.get(sourceIndex)); + Preconditions.checkState(!splits.isEmpty()); + LOG.debug("Assigning pending splits subtask={} {}", subtaskId, splits); + context.sendEventToSourceReader(subtaskId, new SwitchSourceEvent(sourceIndex)); + context.assignSplits( + new SplitsAssignment<HybridSourceSplit>( + Collections.singletonMap(subtaskId, (List) splits))); + context.signalNoMoreSplits(subtaskId); + // Empty collection indicates that splits have been assigned + splits.clear(); + } + } + + @Override + public HybridSourceEnumeratorState snapshotState(long checkpointId) throws Exception { + Object enumState = currentEnumerator.snapshotState(checkpointId); + return new HybridSourceEnumeratorState(currentSourceIndex, enumState); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SourceReaderFinishedEvent) { + SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent) sourceEvent; + if (srfe.sourceIndex() != currentSourceIndex) { + if (srfe.sourceIndex() < currentSourceIndex) { + // Assign pending splits if any + TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySource = + pendingSplits.get(subtaskId); + if (splitsBySource != null) { + List<HybridSourceSplit<SplitT>> splits = + splitsBySource.get(srfe.sourceIndex()); + if (splits != null && splits.isEmpty()) { + // Splits have been processed by the reader + splitsBySource.remove(srfe.sourceIndex()); + } + if (splitsBySource.isEmpty()) { + pendingSplits.remove(subtaskId); + } else { + Integer nextSubtaskSourceIndex = splitsBySource.firstKey(); + LOG.debug( + "Restore subtask={}, sourceIndex={}", + subtaskId, + nextSubtaskSourceIndex); + context.sendEventToSourceReader( + subtaskId, new SwitchSourceEvent(nextSubtaskSourceIndex)); + assignPendingSplits(subtaskId); + } + } + // Once all pending splits have been processed, add the readers to the current + // enumerator, which may in turn trigger new split assignments + if (!pendingReaders.isEmpty() && pendingSplits.isEmpty()) { + // Advance pending readers to current enumerator + LOG.debug( + "Adding pending readers {} to enumerator currentSourceIndex={}", + pendingReaders, + currentSourceIndex); + for (int pendingReaderSubtaskId : pendingReaders) { + context.sendEventToSourceReader( + pendingReaderSubtaskId, + new SwitchSourceEvent(currentSourceIndex)); + } + for (int pendingReaderSubtaskId : pendingReaders) { + currentEnumerator.addReader(pendingReaderSubtaskId); + } + pendingReaders.clear(); + } + } else { + // enumerator already switched + LOG.debug("Ignoring out of order event {}", srfe); + } + return; + } Review comment: For such a long method, I'd rather prefer more `return` to keep the nesting down. Or simply extract methods. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state. */ +public class HybridSourceEnumeratorStateSerializer Review comment: Do we need to store, which sources are currently in the hybrid source? What happens if a user adds/removes a source before recovering? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * <p>This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * <p>When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader<T> implements SourceReader<T, HybridSourceSplit> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); + private final SourceReaderContext readerContext; + private final List<SourceReader<T, ? extends SourceSplit>> chainedReaders; + private int currentSourceIndex = -1; + private SourceReader<T, ? extends SourceSplit> currentReader; + private CompletableFuture<Void> availabilityFuture; + + public HybridSourceReader( + SourceReaderContext readerContext, + List<SourceReader<T, ? extends SourceSplit>> readers) { + this.readerContext = readerContext; + this.chainedReaders = readers; + } + + @Override + public void start() { + setCurrentReader(0); Review comment: Is it intended that we always start at 0 even after HA failover? Do we quickly go over already processed sub-sources then? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ########## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * <p>This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * <p>Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * <p>During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. These splits may originate from a previous enumerator that + * is no longer active. In that case {@link HybridSourceSplitEnumerator} will suspend forwarding to + * the current enumerator and replay the returned splits by activating the previous readers. After + * returned splits were processed, delegation to the current underlying enumerator resumes. + */ +public class HybridSourceSplitEnumerator + implements SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + + private final SplitEnumeratorContext<HybridSourceSplit> context; + private final HybridSource.SourceChain<?, ?> sourceChain; + // TODO: SourceCoordinatorContext does not provide access to current assignments + private final Map<Integer, List<HybridSourceSplit>> assignments; + // Splits that have been returned due to subtask reset + private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> pendingSplits; + private final HashSet<Integer> pendingReaders; + private int currentSourceIndex; + private SplitEnumerator<SourceSplit, Object> currentEnumerator; + + public HybridSourceSplitEnumerator( + SplitEnumeratorContext<HybridSourceSplit> context, + HybridSource.SourceChain<?, ?> sourceChain, + int initialSourceIndex) { + Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); + this.context = context; + this.sourceChain = sourceChain; + this.currentSourceIndex = initialSourceIndex; + this.assignments = new HashMap<>(); + this.pendingSplits = new HashMap<>(); + this.pendingReaders = new HashSet<>(); + } + + @Override + public void start() { + switchEnumerator(); + } + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) { + LOG.debug( + "handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", + subtaskId, + currentSourceIndex, + pendingSplits); + // TODO: test coverage for on demand split assignment + assignPendingSplits(subtaskId); + currentEnumerator.handleSplitRequest(subtaskId, requesterHostname); + } + + @Override + public void addSplitsBack(List<HybridSourceSplit> splits, int subtaskId) { + LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits); + // Splits returned can belong to multiple sources, after switching since last checkpoint + TreeMap<Integer, List<HybridSourceSplit>> splitsBySourceIndex = new TreeMap<>(); Review comment: Aren't we polling always the splits from the first source? Could we use some kind of `PriorityQueue`? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * <p>This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * <p>When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader<T> implements SourceReader<T, HybridSourceSplit> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); + private final SourceReaderContext readerContext; + private final List<SourceReader<T, ? extends SourceSplit>> chainedReaders; + private int currentSourceIndex = -1; + private SourceReader<T, ? extends SourceSplit> currentReader; + private CompletableFuture<Void> availabilityFuture; + + public HybridSourceReader( + SourceReaderContext readerContext, + List<SourceReader<T, ? extends SourceSplit>> readers) { + this.readerContext = readerContext; + this.chainedReaders = readers; + } + + @Override + public void start() { + setCurrentReader(0); + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + InputStatus status = currentReader.pollNext(output); + if (status == InputStatus.END_OF_INPUT) { + // trap END_OF_INPUT if this wasn't the final reader + LOG.info( + "End of input subtask={} sourceIndex={} {}", + readerContext.getIndexOfSubtask(), + currentSourceIndex, + currentReader); + if (currentSourceIndex + 1 < chainedReaders.size()) { + // Signal the coordinator that the current reader has consumed all input and the + // next source can potentially be activated (after all readers are ready). + readerContext.sendSourceEventToCoordinator( + new SourceReaderFinishedEvent(currentSourceIndex)); + // More data will be available from the next reader. + // InputStatus.NOTHING_AVAILABLE requires us to complete the availability + // future after source switch to resume poll. + return InputStatus.NOTHING_AVAILABLE; + } + } + return status; + } + + @Override + public List<HybridSourceSplit> snapshotState(long checkpointId) { + List<? extends SourceSplit> state = currentReader.snapshotState(checkpointId); Review comment: Wouldn't we also need to snapshot past readers? What happens with sources that commit their offsets back? When would their final offset appear? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ########## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * <p>This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * <p>Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * <p>During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. These splits may originate from a previous enumerator that + * is no longer active. In that case {@link HybridSourceSplitEnumerator} will suspend forwarding to + * the current enumerator and replay the returned splits by activating the previous readers. After + * returned splits were processed, delegation to the current underlying enumerator resumes. + */ +public class HybridSourceSplitEnumerator + implements SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + + private final SplitEnumeratorContext<HybridSourceSplit> context; + private final HybridSource.SourceChain<?, ?> sourceChain; + // TODO: SourceCoordinatorContext does not provide access to current assignments + private final Map<Integer, List<HybridSourceSplit>> assignments; + // Splits that have been returned due to subtask reset + private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> pendingSplits; + private final HashSet<Integer> pendingReaders; + private int currentSourceIndex; + private SplitEnumerator<SourceSplit, Object> currentEnumerator; + + public HybridSourceSplitEnumerator( + SplitEnumeratorContext<HybridSourceSplit> context, + HybridSource.SourceChain<?, ?> sourceChain, + int initialSourceIndex) { + Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); + this.context = context; + this.sourceChain = sourceChain; + this.currentSourceIndex = initialSourceIndex; + this.assignments = new HashMap<>(); + this.pendingSplits = new HashMap<>(); + this.pendingReaders = new HashSet<>(); + } + + @Override + public void start() { + switchEnumerator(); + } + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) { + LOG.debug( + "handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", + subtaskId, + currentSourceIndex, + pendingSplits); + // TODO: test coverage for on demand split assignment + assignPendingSplits(subtaskId); + currentEnumerator.handleSplitRequest(subtaskId, requesterHostname); + } + + @Override + public void addSplitsBack(List<HybridSourceSplit> splits, int subtaskId) { + LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits); + // Splits returned can belong to multiple sources, after switching since last checkpoint + TreeMap<Integer, List<HybridSourceSplit>> splitsBySourceIndex = new TreeMap<>(); + + for (HybridSourceSplit split : splits) { + splitsBySourceIndex + .computeIfAbsent(split.sourceIndex(), k -> new ArrayList<>()) + .add(split); + } + + splitsBySourceIndex.forEach( + (k, splitsPerSource) -> { + if (k == currentSourceIndex) { + currentEnumerator.addSplitsBack( + HybridSourceSplit.unwrapSplits(splitsPerSource), subtaskId); + } else { + pendingSplits + .computeIfAbsent(subtaskId, sourceIndex -> new TreeMap<>()) + .put(k, splitsPerSource); + if (context.registeredReaders().containsKey(subtaskId)) { + assignPendingSplits(subtaskId); + } + } + }); + } + + @Override + public void addReader(int subtaskId) { + LOG.debug("addReader subtaskId={}", subtaskId); + if (pendingSplits.isEmpty()) { + context.sendEventToSourceReader(subtaskId, new SwitchSourceEvent(currentSourceIndex)); + LOG.debug("Adding reader {} to enumerator {}", subtaskId, currentSourceIndex); + currentEnumerator.addReader(subtaskId); + } else { + // Defer adding reader to the current enumerator until splits belonging to earlier + // enumerators that were added back have been processed + pendingReaders.add(subtaskId); + assignPendingSplits(subtaskId); + } Review comment: These cases are not fully clear to me. The first is triggered on start and the second on regional failover? Would you please be so kind and add some comment? ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ########## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * <p>This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * <p>Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * <p>During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator<SplitT extends SourceSplit> + implements SplitEnumerator<HybridSourceSplit<SplitT>, HybridSourceEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + + private final SplitEnumeratorContext<HybridSourceSplit> context; + private final HybridSource.SourceChain<?, SplitT, Object> sourceChain; + // TODO: SourceCoordinatorContext does not provide access to current assignments + private final Map<Integer, List<HybridSourceSplit<SplitT>>> assignments; + private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit<SplitT>>>> pendingSplits; + private final HashSet<Integer> pendingReaders; + private int currentSourceIndex; + private SplitEnumerator<SplitT, Object> currentEnumerator; + + public HybridSourceSplitEnumerator( + SplitEnumeratorContext<HybridSourceSplit> context, + HybridSource.SourceChain<?, SplitT, Object> sourceChain) { + this(context, sourceChain, 0); + } + + public HybridSourceSplitEnumerator( + SplitEnumeratorContext<HybridSourceSplit> context, + HybridSource.SourceChain<?, SplitT, Object> sourceChain, + int initialSourceIndex) { + Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); + this.context = context; + this.sourceChain = sourceChain; + this.currentSourceIndex = initialSourceIndex; + this.assignments = new HashMap<>(); + this.pendingSplits = new HashMap<>(); + this.pendingReaders = new HashSet<>(); + } + + @Override + public void start() { + switchEnumerator(); + } + + @Override + public void handleSplitRequest(int subtaskId, String requesterHostname) { + LOG.debug( + "handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", + subtaskId, + currentSourceIndex, + pendingSplits); + // TODO: test coverage for on demand split assignment + assignPendingSplits(subtaskId); + currentEnumerator.handleSplitRequest(subtaskId, requesterHostname); + } + + @Override + public void addSplitsBack(List<HybridSourceSplit<SplitT>> splits, int subtaskId) { + LOG.debug("Adding splits back for subtask={} {}", subtaskId, splits); + // Splits returned can belong to multiple sources, after switching since last checkpoint + TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySourceIndex = new TreeMap<>(); + + for (HybridSourceSplit<SplitT> split : splits) { + splitsBySourceIndex + .computeIfAbsent(split.sourceIndex(), k -> new ArrayList<>()) + .add(split); + } + + splitsBySourceIndex.forEach( + (k, splitsPerSource) -> { + if (k == currentSourceIndex) { + currentEnumerator.addSplitsBack( + HybridSourceReader.unwrapSplits(splitsPerSource), subtaskId); + } else { + pendingSplits + .computeIfAbsent(subtaskId, sourceIndex -> new TreeMap<>()) + .put(k, splitsPerSource); + if (context.registeredReaders().containsKey(subtaskId)) { + assignPendingSplits(subtaskId); + } + } + }); + } + + @Override + public void addReader(int subtaskId) { + LOG.debug("addReader subtaskId={}", subtaskId); + if (pendingSplits.isEmpty()) { + context.sendEventToSourceReader(subtaskId, new SwitchSourceEvent(currentSourceIndex)); + LOG.debug("Adding reader {} to enumerator {}", subtaskId, currentSourceIndex); + currentEnumerator.addReader(subtaskId); + } else { + // Defer adding reader to the current enumerator until splits belonging to earlier + // enumerators that were added back have been processed + pendingReaders.add(subtaskId); + assignPendingSplits(subtaskId); + } + } + + private void assignPendingSplits(int subtaskId) { + TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySource = + pendingSplits.get(subtaskId); + if (splitsBySource != null) { + int sourceIndex = splitsBySource.firstKey(); + List<HybridSourceSplit<SplitT>> splits = + Preconditions.checkNotNull(splitsBySource.get(sourceIndex)); + Preconditions.checkState(!splits.isEmpty()); + LOG.debug("Assigning pending splits subtask={} {}", subtaskId, splits); + context.sendEventToSourceReader(subtaskId, new SwitchSourceEvent(sourceIndex)); + context.assignSplits( + new SplitsAssignment<HybridSourceSplit>( + Collections.singletonMap(subtaskId, (List) splits))); + context.signalNoMoreSplits(subtaskId); + // Empty collection indicates that splits have been assigned + splits.clear(); + } + } + + @Override + public HybridSourceEnumeratorState snapshotState(long checkpointId) throws Exception { + Object enumState = currentEnumerator.snapshotState(checkpointId); + return new HybridSourceEnumeratorState(currentSourceIndex, enumState); + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + if (sourceEvent instanceof SourceReaderFinishedEvent) { + SourceReaderFinishedEvent srfe = (SourceReaderFinishedEvent) sourceEvent; + if (srfe.sourceIndex() != currentSourceIndex) { + if (srfe.sourceIndex() < currentSourceIndex) { + // Assign pending splits if any + TreeMap<Integer, List<HybridSourceSplit<SplitT>>> splitsBySource = + pendingSplits.get(subtaskId); + if (splitsBySource != null) { + List<HybridSourceSplit<SplitT>> splits = + splitsBySource.get(srfe.sourceIndex()); + if (splits != null && splits.isEmpty()) { + // Splits have been processed by the reader + splitsBySource.remove(srfe.sourceIndex()); + } + if (splitsBySource.isEmpty()) { + pendingSplits.remove(subtaskId); + } else { + Integer nextSubtaskSourceIndex = splitsBySource.firstKey(); + LOG.debug( + "Restore subtask={}, sourceIndex={}", + subtaskId, + nextSubtaskSourceIndex); + context.sendEventToSourceReader( + subtaskId, new SwitchSourceEvent(nextSubtaskSourceIndex)); + assignPendingSplits(subtaskId); + } + } + // Once all pending splits have been processed, add the readers to the current + // enumerator, which may in turn trigger new split assignments + if (!pendingReaders.isEmpty() && pendingSplits.isEmpty()) { + // Advance pending readers to current enumerator + LOG.debug( + "Adding pending readers {} to enumerator currentSourceIndex={}", + pendingReaders, + currentSourceIndex); + for (int pendingReaderSubtaskId : pendingReaders) { + context.sendEventToSourceReader( + pendingReaderSubtaskId, + new SwitchSourceEvent(currentSourceIndex)); + } + for (int pendingReaderSubtaskId : pendingReaders) { + currentEnumerator.addReader(pendingReaderSubtaskId); + } + pendingReaders.clear(); + } + } else { + // enumerator already switched + LOG.debug("Ignoring out of order event {}", srfe); + } + return; + } + this.assignments.remove(subtaskId); + LOG.info( + "Reader finished for subtask {} remaining assignments {}", + subtaskId, + assignments); + if (this.assignments.isEmpty()) { + LOG.debug("No assignments remaining, ready to switch readers!"); + if (currentSourceIndex + 1 < sourceChain.sources.size()) { + switchEnumerator(); + // switch all readers prior to sending split assignments + for (int i = 0; i < context.currentParallelism(); i++) { + context.sendEventToSourceReader( + i, new SwitchSourceEvent(currentSourceIndex)); + } + // trigger split assignment, + // (initially happens as part of subtask/reader registration) + for (int i = 0; i < context.currentParallelism(); i++) { + LOG.debug("adding reader subtask={} sourceIndex={}", i, currentSourceIndex); + currentEnumerator.addReader(i); + } + } + } + } else { + currentEnumerator.handleSourceEvent(subtaskId, sourceEvent); + } + } + + @Override + public void close() throws IOException { + currentEnumerator.close(); + } + + private void switchEnumerator() { + + Object enumeratorState = null; + if (currentEnumerator != null) { + try { + enumeratorState = currentEnumerator.snapshotState(-1); Review comment: Ah I think this explains my question about the state of past readers. Makes sense now. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ########## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * <p>This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * <p>Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * <p>During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. These splits may originate from a previous enumerator that + * is no longer active. In that case {@link HybridSourceSplitEnumerator} will suspend forwarding to + * the current enumerator and replay the returned splits by activating the previous readers. After + * returned splits were processed, delegation to the current underlying enumerator resumes. + */ +public class HybridSourceSplitEnumerator + implements SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + + private final SplitEnumeratorContext<HybridSourceSplit> context; + private final HybridSource.SourceChain<?, ?> sourceChain; + // TODO: SourceCoordinatorContext does not provide access to current assignments + private final Map<Integer, List<HybridSourceSplit>> assignments; + // Splits that have been returned due to subtask reset + private final Map<Integer, TreeMap<Integer, List<HybridSourceSplit>>> pendingSplits; + private final HashSet<Integer> pendingReaders; Review comment: For the non-poc, let's try to avoid specific implementations and use the respective interfaces in the declarations instead. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configured source chain. */ +@PublicEvolving +public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> { + + private final SourceChain<T, ?> sourceChain; + + public HybridSource(SourceChain<T, ?> sourceChain) { + Preconditions.checkArgument(!sourceChain.sources.isEmpty()); + for (int i = 0; i < sourceChain.sources.size() - 1; i++) { + Preconditions.checkArgument( + Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()), + "All sources except the final source need to be bounded."); + } Review comment: These checks should be done while the source chain is constructed / builder is invoked. ########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ########## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * <p>This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * <p>Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * <p>During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. These splits may originate from a previous enumerator that + * is no longer active. In that case {@link HybridSourceSplitEnumerator} will suspend forwarding to + * the current enumerator and replay the returned splits by activating the previous readers. After + * returned splits were processed, delegation to the current underlying enumerator resumes. + */ +public class HybridSourceSplitEnumerator + implements SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + + private final SplitEnumeratorContext<HybridSourceSplit> context; + private final HybridSource.SourceChain<?, ?> sourceChain; Review comment: If you go with builder pattern, this should be a `List<SourceWithConverter>` sources. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
