becketqin commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r678743333
########## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ########## @@ -110,69 +106,64 @@ public Boundedness getBoundedness() { @Override public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() { - // List<SimpleVersionedSerializer<SourceSplit>> serializers = new ArrayList<>(); - // TODO: serializers are created on demand as underlying sources are created during switch - // sources.forEach(t -> serializers.add(castSerializer(t.source.getSplitSerializer()))); return new HybridSourceSplitSerializer(switchedSources); } @Override public SimpleVersionedSerializer<HybridSourceEnumeratorState> getEnumeratorCheckpointSerializer() { - List<SimpleVersionedSerializer<Object>> serializers = new ArrayList<>(); - sources.forEach( - t -> serializers.add(castSerializer(t.source.getEnumeratorCheckpointSerializer()))); - return new HybridSourceEnumeratorStateSerializer(serializers); - } - - private static <T> SimpleVersionedSerializer<T> castSerializer( - SimpleVersionedSerializer<? extends T> s) { - @SuppressWarnings("rawtypes") - SimpleVersionedSerializer s1 = s; - return s1; + return new HybridSourceEnumeratorStateSerializer(switchedSources); } /** - * Callback for switch time customization of the underlying source, typically to dynamically set - * a start position from previous enumerator end state. + * Factory for underlying sources of {@link HybridSource}. * - * <p>Requires the ability to augment the existing source (or clone and modify). Provides the - * flexibility to set start position in any way a source allows, in a source specific way. - * Future convenience could be built on top of it, for example an implementation recognizes - * optional interfaces. + * <p>This factory permits building of a source at graph construction time or deferred at switch + * time. Provides the ability to set a start position in any way a specific source allows. + * Future convenience could be built on top of it, for example a default implementation that + * recognizes optional interfaces to transfer position in a common format. * * <p>Called when the current enumerator has finished and before the next enumerator is created. - * The enumerator end state can thus be used to set the next source's start start position. + * The enumerator end state can thus be used to set the next source's start start position. Only + * required for dynamic position transfer at time of switching. * - * <p>Only required for dynamic position transfer at time of switching, otherwise source can be - * preconfigured with a start position during job submission. + * <p>If start position is known at jib submission, the source can be constructed in the entry + * point and simply wrapped into the factory, providing the benefit of validation during + * submission. */ - public interface SourceConfigurer<SourceT extends Source, FromEnumT extends SplitEnumerator> + public interface SourceFactory<T, SourceT extends Source, FromEnumT extends SplitEnumerator> extends Serializable { - SourceT configure(SourceT source, FromEnumT enumerator); + SourceT create(FromEnumT enumerator); Review comment: @tweise Thanks for the explanation. That makes sense to me. Actually after a second thought, maybe we don't need a unified API on the _end_position_. Such _end_position_ may vary in different organizations and use cases even for the same Source. For example, a Source reading from HDFS may either define the _end_position_ as a timestamp, or a list of Kafka TopicPartition-offset pairs indicating the exact messages put into the file when the file was closed. The the definition of _end_position_ largely depends on individual users. It is difficult to provide a reasonable implementation of _end_position_ out of the box. Arguably, timestamp could be an option applicable to all the Sources, but there are still a lot of other cases. Therefore, it seems inevitable for the users to implement their own way to define and expose the _end_position_. That means the _end_position_ retrieval has to be `SplitEnumerator` implementation specific. So having a unified _end_position_ retrieval API in the `SplitEnumerator` does not have a strong use case support. It probably has been clear to everyone else except me... Sorry for the long discussion and thanks for the patient explanation. The current API looks good to me. I have no further concerns now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org