AHeise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r649078298
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -107,48 +127,74 @@ public Boundedness getBoundedness() {
}
/**
- * Converts checkpoint between sources to transfer end position to next
source's start position.
- * Only required for dynamic position transfer at time of switching,
otherwise source can be
+ * Callback for switch time customization of the underlying source from
previous enumerator end
+ * state.
+ *
+ * <p>Called when the current enumerator has finished and before the next
enumerator is created.
+ * The enumerator end state can thus be used to set the next source's
start start position.
+ *
+ * <p>Only required for dynamic position transfer at time of switching,
otherwise source can be
* preconfigured with a start position during job submission.
*/
- public interface CheckpointConverter<InCheckpointT, OutCheckpointT>
- extends Function<InCheckpointT, OutCheckpointT>, Serializable {}
+ public interface SourceConfigurer<SourceT extends Source, FromEnumT
extends SplitEnumerator>
+ extends Serializable {
+ SourceT configure(SourceT source, FromEnumT enumerator);
+ }
+
+ private static class NoopSourceConfigurer<
Review comment:
We should probably use a singleton here.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -107,48 +127,74 @@ public Boundedness getBoundedness() {
}
/**
- * Converts checkpoint between sources to transfer end position to next
source's start position.
- * Only required for dynamic position transfer at time of switching,
otherwise source can be
+ * Callback for switch time customization of the underlying source from
previous enumerator end
+ * state.
+ *
+ * <p>Called when the current enumerator has finished and before the next
enumerator is created.
+ * The enumerator end state can thus be used to set the next source's
start start position.
+ *
+ * <p>Only required for dynamic position transfer at time of switching,
otherwise source can be
* preconfigured with a start position during job submission.
*/
- public interface CheckpointConverter<InCheckpointT, OutCheckpointT>
- extends Function<InCheckpointT, OutCheckpointT>, Serializable {}
+ public interface SourceConfigurer<SourceT extends Source, FromEnumT
extends SplitEnumerator>
+ extends Serializable {
+ SourceT configure(SourceT source, FromEnumT enumerator);
+ }
+
+ private static class NoopSourceConfigurer<
+ SourceT extends Source, FromEnumT extends SplitEnumerator>
+ implements SourceConfigurer<SourceT, FromEnumT> {
+ @Override
+ public SourceT configure(SourceT source, FromEnumT enumerator) {
+ return source;
+ }
+ }
- /** Chain of sources with option to convert start position at switch-time.
*/
- public static class SourceChain<T, EnumChkT> implements Serializable {
- final List<Tuple2<Source<T, ? extends SourceSplit, ?>,
CheckpointConverter<?, ?>>> sources;
+ /** Entry for list of underlying sources. */
+ protected static class SourceListEntry implements Serializable {
+ protected final Source source;
+ protected final SourceConfigurer configurer;
- public SourceChain(Source<T, ?, EnumChkT> initialSource) {
- this(concat(Collections.emptyList(), Tuple2.of(initialSource,
null)));
+ private SourceListEntry(Source source, SourceConfigurer configurer) {
+ this.source = source;
+ this.configurer = configurer;
}
- /** Construct a chain of homogeneous sources with fixed start
position. */
- public static <T, EnumChkT> SourceChain<T, EnumChkT> of(Source<T, ?,
EnumChkT>... sources) {
- Preconditions.checkArgument(sources.length >= 1, "At least one
source");
- SourceChain<T, EnumChkT> sourceChain = new
SourceChain<>(sources[0]);
- for (int i = 1; i < sources.length; i++) {
- sourceChain = sourceChain.add(sources[i]);
- }
- return sourceChain;
+ public static SourceListEntry of(Source source, SourceConfigurer
configurer) {
+ return new SourceListEntry(source, configurer);
}
+ }
- private SourceChain(
- List<Tuple2<Source<T, ? extends SourceSplit, ?>,
CheckpointConverter<?, ?>>>
- sources) {
+ /** Builder for HybridSource. */
+ public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
+ implements Serializable {
+ private final List<SourceListEntry> sources;
+
+ public HybridSourceBuilder(Source<T, ?, ?> initialSource) {
+ this(concat(Collections.emptyList(),
SourceListEntry.of(initialSource, null)));
+ }
+
+ private HybridSourceBuilder(List<SourceListEntry> sources) {
this.sources = sources;
}
- /** Add source with fixed start position. */
- public <NextEnumChkT> SourceChain<T, NextEnumChkT> add(
- Source<T, ?, NextEnumChkT> nextSource) {
- return add(nextSource, null);
+ /** Add source without switch time modification. */
+ public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T,
?, ?>>
+ HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source) {
+ return addSource(source, new NoopSourceConfigurer<>());
+ }
+
+ /** Add source with start position conversion from previous
enumerator. */
+ public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T,
?, ?>>
+ HybridSourceBuilder<T, ToEnumT> addSource(
+ NextSourceT source, SourceConfigurer<NextSourceT,
EnumT> sourceConfigurer) {
+ return new HybridSourceBuilder<>(
Review comment:
Here we could also just add to `sources` and return a cast of `this`
(probably over raw). Then we also can avoid the concat and just use a mutable
list inside a mutable builder.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configured source
chain. */
+@PublicEvolving
+public class HybridSource<T> implements Source<T, HybridSourceSplit,
HybridSourceEnumeratorState> {
Review comment:
Seems like I'm the only one that associates Hybrid differently, so we
should just keep as is.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -78,9 +89,14 @@ public InputStatus pollNext(ReaderOutput output) throws
Exception {
// next source can potentially be activated (after all readers
are ready).
readerContext.sendSourceEventToCoordinator(
new SourceReaderFinishedEvent(currentSourceIndex));
- // More data will be available from the next reader.
+ if (!pendingSplits.isEmpty()) {
+ // we have splits for another reader waiting
+ setCurrentReader(pendingSplits.peek().sourceIndex());
+ return InputStatus.MORE_AVAILABLE;
+ }
+ // More splits may arrive for this or subsequent reader.
// InputStatus.NOTHING_AVAILABLE requires us to complete the
availability
- // future after source switch to resume poll.
+ // future after receiving more splits to resume poll.
return InputStatus.NOTHING_AVAILABLE;
Review comment:
To simplify, just set `currentReader == null` and `return
InputStatus.MORE_AVAILABLE`. Then the next invocation takes care.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -60,11 +62,20 @@ public HybridSourceReader(
@Override
public void start() {
- setCurrentReader(0);
+ // underlying reader starts on demand with split assignment
}
@Override
public InputStatus pollNext(ReaderOutput output) throws Exception {
Review comment:
Do we want to support a bounded hybrid? Currently, there is no
`END_OF_INPUT` return value...
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -107,48 +127,74 @@ public Boundedness getBoundedness() {
}
/**
- * Converts checkpoint between sources to transfer end position to next
source's start position.
- * Only required for dynamic position transfer at time of switching,
otherwise source can be
+ * Callback for switch time customization of the underlying source from
previous enumerator end
+ * state.
+ *
+ * <p>Called when the current enumerator has finished and before the next
enumerator is created.
+ * The enumerator end state can thus be used to set the next source's
start start position.
+ *
+ * <p>Only required for dynamic position transfer at time of switching,
otherwise source can be
* preconfigured with a start position during job submission.
*/
- public interface CheckpointConverter<InCheckpointT, OutCheckpointT>
- extends Function<InCheckpointT, OutCheckpointT>, Serializable {}
+ public interface SourceConfigurer<SourceT extends Source, FromEnumT
extends SplitEnumerator>
+ extends Serializable {
+ SourceT configure(SourceT source, FromEnumT enumerator);
+ }
+
+ private static class NoopSourceConfigurer<
+ SourceT extends Source, FromEnumT extends SplitEnumerator>
+ implements SourceConfigurer<SourceT, FromEnumT> {
+ @Override
+ public SourceT configure(SourceT source, FromEnumT enumerator) {
+ return source;
+ }
+ }
- /** Chain of sources with option to convert start position at switch-time.
*/
- public static class SourceChain<T, EnumChkT> implements Serializable {
- final List<Tuple2<Source<T, ? extends SourceSplit, ?>,
CheckpointConverter<?, ?>>> sources;
+ /** Entry for list of underlying sources. */
+ protected static class SourceListEntry implements Serializable {
+ protected final Source source;
+ protected final SourceConfigurer configurer;
- public SourceChain(Source<T, ?, EnumChkT> initialSource) {
- this(concat(Collections.emptyList(), Tuple2.of(initialSource,
null)));
+ private SourceListEntry(Source source, SourceConfigurer configurer) {
+ this.source = source;
+ this.configurer = configurer;
}
- /** Construct a chain of homogeneous sources with fixed start
position. */
- public static <T, EnumChkT> SourceChain<T, EnumChkT> of(Source<T, ?,
EnumChkT>... sources) {
- Preconditions.checkArgument(sources.length >= 1, "At least one
source");
- SourceChain<T, EnumChkT> sourceChain = new
SourceChain<>(sources[0]);
- for (int i = 1; i < sources.length; i++) {
- sourceChain = sourceChain.add(sources[i]);
- }
- return sourceChain;
+ public static SourceListEntry of(Source source, SourceConfigurer
configurer) {
+ return new SourceListEntry(source, configurer);
}
+ }
- private SourceChain(
- List<Tuple2<Source<T, ? extends SourceSplit, ?>,
CheckpointConverter<?, ?>>>
- sources) {
+ /** Builder for HybridSource. */
+ public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator>
+ implements Serializable {
+ private final List<SourceListEntry> sources;
+
+ public HybridSourceBuilder(Source<T, ?, ?> initialSource) {
+ this(concat(Collections.emptyList(),
SourceListEntry.of(initialSource, null)));
Review comment:
`null` -> `new NoopSourceConfigurer<>()`?
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -60,11 +62,20 @@ public HybridSourceReader(
@Override
public void start() {
- setCurrentReader(0);
+ // underlying reader starts on demand with split assignment
}
@Override
public InputStatus pollNext(ReaderOutput output) throws Exception {
+ if (currentReader == null) {
+ if (pendingSplits.isEmpty()) {
+ // no underlying reader before split assignment
+ return InputStatus.NOTHING_AVAILABLE;
Review comment:
Do we want to support a bounded hybrid? Currently, there is no
`END_OF_INPUT` return value...
I guess we would need some flag to when we received `noMoreSplits` on last
reader index and then return EOI here.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -60,11 +62,20 @@ public HybridSourceReader(
@Override
public void start() {
- setCurrentReader(0);
+ // underlying reader starts on demand with split assignment
Review comment:
`+1` this looked weird before.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -107,48 +127,74 @@ public Boundedness getBoundedness() {
}
/**
- * Converts checkpoint between sources to transfer end position to next
source's start position.
- * Only required for dynamic position transfer at time of switching,
otherwise source can be
+ * Callback for switch time customization of the underlying source from
previous enumerator end
+ * state.
+ *
+ * <p>Called when the current enumerator has finished and before the next
enumerator is created.
+ * The enumerator end state can thus be used to set the next source's
start start position.
+ *
+ * <p>Only required for dynamic position transfer at time of switching,
otherwise source can be
* preconfigured with a start position during job submission.
*/
- public interface CheckpointConverter<InCheckpointT, OutCheckpointT>
- extends Function<InCheckpointT, OutCheckpointT>, Serializable {}
+ public interface SourceConfigurer<SourceT extends Source, FromEnumT
extends SplitEnumerator>
+ extends Serializable {
+ SourceT configure(SourceT source, FromEnumT enumerator);
+ }
+
+ private static class NoopSourceConfigurer<
+ SourceT extends Source, FromEnumT extends SplitEnumerator>
+ implements SourceConfigurer<SourceT, FromEnumT> {
+ @Override
+ public SourceT configure(SourceT source, FromEnumT enumerator) {
+ return source;
+ }
+ }
- /** Chain of sources with option to convert start position at switch-time.
*/
- public static class SourceChain<T, EnumChkT> implements Serializable {
- final List<Tuple2<Source<T, ? extends SourceSplit, ?>,
CheckpointConverter<?, ?>>> sources;
+ /** Entry for list of underlying sources. */
+ protected static class SourceListEntry implements Serializable {
+ protected final Source source;
+ protected final SourceConfigurer configurer;
- public SourceChain(Source<T, ?, EnumChkT> initialSource) {
- this(concat(Collections.emptyList(), Tuple2.of(initialSource,
null)));
+ private SourceListEntry(Source source, SourceConfigurer configurer) {
+ this.source = source;
+ this.configurer = configurer;
Review comment:
`checkNotNull`, we just force the `NoopSourceConfigurer` where we used
`null` before.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -107,48 +127,74 @@ public Boundedness getBoundedness() {
}
/**
- * Converts checkpoint between sources to transfer end position to next
source's start position.
- * Only required for dynamic position transfer at time of switching,
otherwise source can be
+ * Callback for switch time customization of the underlying source from
previous enumerator end
+ * state.
+ *
+ * <p>Called when the current enumerator has finished and before the next
enumerator is created.
+ * The enumerator end state can thus be used to set the next source's
start start position.
+ *
+ * <p>Only required for dynamic position transfer at time of switching,
otherwise source can be
* preconfigured with a start position during job submission.
*/
- public interface CheckpointConverter<InCheckpointT, OutCheckpointT>
- extends Function<InCheckpointT, OutCheckpointT>, Serializable {}
+ public interface SourceConfigurer<SourceT extends Source, FromEnumT
extends SplitEnumerator>
Review comment:
Yes this approach seems more flexible but does it imply that we need to
touch all sources to allow splits being set?
It feels like we end up with mutable sources then instead of setting the
changes in their builders? Maybe that also implies that we should actually have
builders in the `SourceListEntry` but then we need some common interfaces for
builders (something that I actually wanted to do independently).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]