Hi all, I've tried to migrate my very simple Elasticsearch SourceFunction (that use scroll API and produce batch of documents) to new Source API, but I gave up because it's too complicated. It should much simpler to migrate that function to a bounded or unbounded source. Before removing completely SourceFunction and Dataset I think it would be better to provide a more detailed migration guide. At least simplify the creation of a bounded Dataset...I still didn't give a look at DataGeneratorSource though. A review of the current online documentation is mandatory IMO.
Best, Flavio On Mon, Jul 3, 2023 at 5:58 PM Alexander Fedulov < alexander.fedu...@gmail.com> wrote: > I am happy to announce that the blocker has been resolved and > SourceFunction > is now marked as @Deprecated [1]. > > The work continues to remove the dependencies on the SourceFunction API in > Flink internals in order to prepare for dropping it completely in Flink > 2.0. > > I'd like to get some opinions on an open question I currently have: > StreamExecutionEnvironment#fromCollection() methods need to be modified to > use > the new FLIP-27 DataGeneratorSource [2]. This presents an issue because > ITCases in DataGeneratorSource rely on StreamExecutionEnvironment, so we > end > up with a circular dependency. > > I see two main options here: > > 1. Split the tests from the DataGeneratorSource into a separate module > called > flink-connector-datagen-tests > This is a rather straightforward solution that breaks the cycle, but so > far > we managed to avoid such workarounds and I'd like to know if anyone has > a > strong opinion against it > > 2. Move #fromCollection() methods into flink-connector-datagen, so > StreamExecutionEnvironment#fromCollection() becomes > DataGeneratorSource#fromCollection() > While this deviates from the familiar pattern, it should be acceptable > given > the major version change.The key question here is whether we should also > introduce a dependency from flink-connector-datagen to > flink-streaming-java. > This dependency does not exist in other connectors, but it would enhance > usability. Without it, the user code would look somewhat like > this: > > Collection<Integer> data = ...; > DataGeneratorSource<Integer> collectionSource = > DataGeneratorSource.fromCollection(data); > DataStreamSource<Integer> source = env.fromSource(collectionSource, > WatermarkStrategy.forMonotonousTimestamps(), "Collection source") > .forceNonParallel(); > > Especially the necessity for the forceNonParallel()/setParallelism(1) > call is > concerning because it is easy to forget. > > With the dependency, we can hide the internal details and achieve an API > closer to the current #fromCollection() implementation: > > Collection<Integer> data = ...; > DataStreamSource<Integer> source = > DataGeneratorSource.fromCollection(env, data); > > I would appreciate hearing your thoughts and suggestions on this matter. > > [1] https://github.com/apache/flink/pull/20049 > [2] https://github.com/apache/flink/pull/22850 > > Best, > Alex > > > > > On Wed, 21 Jun 2023 at 19:27, Alexander Fedulov < > alexander.fedu...@gmail.com> > wrote: > > > I'd like to revive the efforts to deprecate the SourceFunction API. > > > > It would be great to get a review for this PR: > > https://github.com/apache/flink/pull/21774 > > > > It immediately unblocks marking the actual SourceFunction as deprecated. > > https://github.com/apache/flink/pull/20049 > > > > There is also this work thread related > > to StreamExecutionEnvironment#fromCollection() methods. > > The discussion seem to have stalled: > > https://github.com/apache/flink/pull/21028 > > > > Thanks, > > Alex > > > > On 2022/06/15 19:30:31 Alexander Fedulov wrote: > > > Thank you all for your valuable input and participation in the > discussion > > > > > > The vote is open now [1] > > > > > > [1] https://lists.apache.org/thread/kv9rj3w2rmkb8jtss5bqffhw57or7v8v > > > > > > Best, > > > Alexander Fedulov > > > >