Glad to see you have resolved the issue! 

If you want to learn more about the Source API, the Flink document [1] has a 
detailed description about it. The original proposal FLIP-27 [2] is also a good 
reference. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/sources/
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

Cheers, 

Qingsheng

> On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com> wrote:
> 
> Thank you Qingsheng, this context helps a lot!
> 
> And once again thank you all for being such a helpful community!
> 
> P.S. I actually struggled for a bit trying to understand why my refactored 
> solution which accepts DataStream<> wouldn't work ("no operators defined in 
> the streaming topology"). Turns out, my assumption that I can call 
> StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get 
> the same environment, was wrong. I had env.addSource and env.fromSource calls 
> using one instance of the environment, but then called env.execute() on 
> another instance :facepalm:
> 
> On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <renqs...@gmail.com> wrote:
> Hi Piotr,
> 
> I’d like to share my understanding about this. Source and SourceFunction are 
> both interfaces to data sources. SourceFunction was designed and introduced 
> earlier and as the project evolved, many shortcomings emerged. Therefore, the 
> community re-designed the source interface and introduced the new Source API 
> in FLIP-27 [1]. 
> 
> Finally we will deprecate the SourceFunction and use Source as the only 
> interface for all data sources, but considering the huge cost of migration 
> you’ll see SourceFunction and Source co-exist for some time, like the 
> ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource 
> as a pioneer has already migrated to the new Source API.
> 
> I think the API to end users didn't change a lot: both 
> env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, 
> and you could apply downstream transformations onto it. 
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>  
> 
> Cheers,
> 
> Qingsheng
> 
> > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com> wrote:
> > 
> > Hi Ken,
> > 
> > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, 
> > navigating the type system and being still confused about differences 
> > between Source, SourceFunction, DataStream, DataStreamOperator, etc. 
> > 
> > I think the DataStream<> type is what I'm looking for? That is, then I can 
> > use:
> > 
> > DataStream<EventData> source = env.fromSource(getKafkaSource(params), 
> > watermarkStrategy, "Kafka");
> > when using KafkaSource in the normal setup
> > 
> > and
> > DataStream<EventData> s = env.addSource(new ParallelTestSource<>(...));
> > when using the testing source [1]
> > 
> > Does that sound right?
> > 
> > [1] 
> > https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26
> > 
> > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kkrugler_li...@transpac.com> 
> > wrote:
> > Hi Piotr,
> > 
> > The way I handle this is via a workflow class that uses a builder approach 
> > to specifying inputs, outputs, and any other configuration settings.
> > 
> > The inputs are typically DataStream<xxx>.
> > 
> > This way I can separate out the Kafka inputs, and use testing sources that 
> > give me very precise control over the inputs (e.g. I can hold up on right 
> > side data to ensure my stateful left join junction is handling deferred 
> > joins properly). I can also use Kafka unit test support (either kafka-junit 
> > or Spring embedded Kafka) if needed.
> > 
> > Then in the actual tool class (with a main method) I’ll wire up the real 
> > Kafka sources, with whatever logic is required to convert the consumer 
> > records to what the workflow is expecting.
> > 
> > — Ken
> > 
> >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> wrote:
> >> 
> >> Hi,
> >> 
> >> I'm wondering: what ithe recommended way to structure the job which one 
> >> would like to test later on with `MiniCluster`.
> >> 
> >> I've looked at the flink-training repository examples [1] and they tend to 
> >> expose the main job as a class that accepts a `SourceFunction` and a 
> >> `SinkFunction`, which make sense. But then, my job is normally constructed 
> >> with `KafkaSource` which is then passed to `env.fromSource(...`.
> >> 
> >> Is there any recommended way of handling these discrepancies, ie. having 
> >> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> >> 
> >> [1] 
> >> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
> >> 
> >> -- 
> >> Piotr Domagalski
> > 
> > --------------------------
> > Ken Krugler
> > http://www.scaleunlimited.com
> > Custom big data solutions
> > Flink, Pinot, Solr, Elasticsearch
> > 
> > 
> > 
> > 
> > 
> > -- 
> > Piotr Domagalski
> 
> 
> 
> -- 
> Piotr Domagalski

Reply via email to