Hi everyone!
Sorry for reopening the thread, but I am having some problems related to this
case while migrating our code from Flink 1.12 to Flink 1.15.
We have a base project that encapsulates a ton of common code and
configurations. One of the abstractions we have is an AbstractDataStreamJob
class that has generic Sources and Sinks. We implemented it like this since
Flink 1.8, following the recommendations of the Flink documentation [1]:
"Apache Flink provides a JUnit rule called MiniClusterWithClientResource for
testing complete jobs against a local, embedded mini cluster. called
MiniClusterWithClientResource.
...
A few remarks on integration testing with MiniClusterWithClientResource:
- In order not to copy your whole pipeline code from production to test, make
sources and sinks pluggable in your production code and inject special test
sources and test sinks in your tests.
..."
This way, we can create the real Kafka Sources and Sinks in the Main class of
the job, and also create the test Sources and Sinks in the Junit tests, and
inject them in the AbstractDataStreamJob class.
The problem comes with the new Source interface and the end to end tests
against the local embedded mini cluster. Prior to Flink 1.15, we used the
FromElementsFunction to create the test SourceFunction. Now that we changed the
code to use the new Source interface, we cannot use the FromElementsFunction
anymore, and we haven't found an equivalent FromElementsSource class with the
same functionality but implemented using the new Source API.
We want to keep the same structure in the AbstractDataStreamJob class (with
generic and pluggable sources and sinks), as we think it is the most elegant
and generic solution.
Is it planned to implement a FromElementsSource class that extends the new
Source API? Is there any other alternative that may serve as a workaround for
the moment?
We have tried to implement a custom Source for this use case, but it seems like
an overwhelming task and we do not want to reinvent the wheel either. If it is
planned to implement the FromElementsSource we'd rather prefer to wait for it.
Thanks!
Carlos
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/testing/#junit-rule-miniclusterwithclientresource
-Original Message-
From: Qingsheng Ren
Sent: miércoles, 25 de mayo de 2022 12:10
To: Piotr Domagalski
Cc: user@flink.apache.org
Subject: [External] Re: Source vs SourceFunction and testing
This message is from an EXTERNAL SENDER - be CAUTIOUS, particularly with links
and attachments.
Glad to see you have resolved the issue!
If you want to learn more about the Source API, the Flink document [1] has a
detailed description about it. The original proposal FLIP-27 [2] is also a good
reference.
[1]
https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Drelease-2D1.15_docs_dev_datastream_sources_=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da=lQGFDQJRG2BADprHFhkCefHCPTjDTh-OGIz4xFl-1W8=
[2]
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D27-253A-2BRefactor-2BSource-2BInterface=DwIFaQ=eIGjsITfXP_y-DLLX0uEHXJvU8nOHrUK8IrwNKOtkVU=Q6157zGhiDIuCzxlSpEZgTNbdEC4jbNL0iaPBqIxifg=UaxcpZWDroSZiLenzhGnRRQockpCt3Hg9jXU50aRVltFNinifOKvurHPTzdPL1da=SqQEnABQt5ZGeX8rUZVEI8wyNDe2GlRNBHtZv5V3MIQ=
Cheers,
Qingsheng
> On May 25, 2022, at 17:54, Piotr Domagalski wrote:
>
> Thank you Qingsheng, this context helps a lot!
>
> And once again thank you all for being such a helpful community!
>
> P.S. I actually struggled for a bit trying to understand why my refactored
> solution which accepts DataStream<> wouldn't work ("no operators defined in
> the streaming topology"). Turns out, my assumption that I can call
> StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get
> the same environment, was wrong. I had env.addSource and env.fromSource calls
> using one instance of the environment, but then called env.execute() on
> another instance :facepalm:
>
> On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren wrote:
> Hi Piotr,
>
> I’d like to share my understanding about this. Source and SourceFunction are
> both interfaces to data sources. SourceFunction was designed and introduced
> earlier and as the project evolved, many shortcomings emerged. Therefore, the
> community re-designed the source interface and introduced the new Source API
> in FLIP-27 [1].
>
> Finally we will deprecate the SourceFunction and use Source as the only
> interface for all data sources, but considering the huge cost of migration
> you’ll see SourceFunction and Source co-exist for some time, like the
> ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource
> as a pioneer has already migrated to the new Source API.
>
> I think the API to