Hi all,
I've tried to migrate my very simple Elasticsearch SourceFunction  (that
use scroll API and produce batch of documents) to new Source API, but I
gave up because it's too complicated. It should much simpler to migrate
that function to a bounded or unbounded source.
Before removing completely SourceFunction and Dataset I think it would be
better to provide a more detailed migration guide.
At least simplify the creation of a bounded Dataset...I still didn't give a
look at DataGeneratorSource though.
A review of the current online documentation is mandatory IMO.

Best,
Flavio


On Mon, Jul 3, 2023 at 5:58 PM Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:

> I am happy to announce that the blocker has been resolved and
> SourceFunction
> is now marked as @Deprecated [1].
>
> The work continues to remove the dependencies on the SourceFunction API in
> Flink internals in order to prepare for dropping it completely in Flink
> 2.0.
>
> I'd like to get some opinions on an open question I currently have:
> StreamExecutionEnvironment#fromCollection() methods need to be modified to
> use
> the new FLIP-27 DataGeneratorSource [2]. This presents an issue because
> ITCases in DataGeneratorSource rely on StreamExecutionEnvironment, so we
> end
> up with a circular dependency.
>
> I see two main options here:
>
> 1. Split the tests from the DataGeneratorSource into a separate module
> called
>    flink-connector-datagen-tests
>    This is a rather straightforward solution that breaks the cycle, but so
> far
>    we managed to avoid such workarounds and I'd like to know if anyone has
> a
>    strong opinion against it
>
> 2. Move #fromCollection() methods into flink-connector-datagen, so
>    StreamExecutionEnvironment#fromCollection() becomes
>    DataGeneratorSource#fromCollection()
>    While this deviates from the familiar pattern, it should be acceptable
> given
>    the major version change.The key question here is whether we should also
>    introduce a dependency from flink-connector-datagen to
> flink-streaming-java.
>    This dependency does not exist in other connectors, but it would enhance
>    usability. Without it, the user code would look somewhat like
>    this:
>
>    Collection<Integer> data = ...;
>    DataGeneratorSource<Integer> collectionSource =
>        DataGeneratorSource.fromCollection(data);
>    DataStreamSource<Integer> source = env.fromSource(collectionSource,
>        WatermarkStrategy.forMonotonousTimestamps(), "Collection source")
>        .forceNonParallel();
>
>    Especially the necessity for the forceNonParallel()/setParallelism(1)
> call is
>   concerning because it is easy to forget.
>
>    With the dependency, we can hide the internal details and achieve an API
>   closer to the current #fromCollection() implementation:
>
>    Collection<Integer> data = ...;
>    DataStreamSource<Integer> source =
>        DataGeneratorSource.fromCollection(env, data);
>
> I would appreciate hearing your thoughts and suggestions on this matter.
>
> [1] https://github.com/apache/flink/pull/20049
> [2] https://github.com/apache/flink/pull/22850
>
> Best,
> Alex
>
>
>
>
> On Wed, 21 Jun 2023 at 19:27, Alexander Fedulov <
> alexander.fedu...@gmail.com>
> wrote:
>
> > I'd like to revive the efforts to deprecate the SourceFunction API.
> >
> > It would be great to get a review for this PR:
> > https://github.com/apache/flink/pull/21774
> >
> > It immediately unblocks marking the actual SourceFunction as deprecated.
> > https://github.com/apache/flink/pull/20049
> >
> > There is also this work thread related
> > to StreamExecutionEnvironment#fromCollection() methods.
> > The discussion seem to have stalled:
> > https://github.com/apache/flink/pull/21028
> >
> > Thanks,
> > Alex
> >
> > On 2022/06/15 19:30:31 Alexander Fedulov wrote:
> > > Thank you all for your valuable input and participation in the
> discussion
> > >
> > > The vote is open now [1]
> > >
> > > [1] https://lists.apache.org/thread/kv9rj3w2rmkb8jtss5bqffhw57or7v8v
> > >
> > > Best,
> > > Alexander Fedulov
> >
> >

Reply via email to