Hi Piotr, Yes, that should work (using DataStream<xxx> as the common result of both source creation options)
— Ken > On May 24, 2022, at 12:19 PM, Piotr Domagalski <pi...@domagalski.com> wrote: > > Hi Ken, > > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, > navigating the type system and being still confused about differences between > Source, SourceFunction, DataStream, DataStreamOperator, etc. > > I think the DataStream<> type is what I'm looking for? That is, then I can > use: > > DataStream<EventData> source = env.fromSource(getKafkaSource(params), > watermarkStrategy, "Kafka"); > when using KafkaSource in the normal setup > > and > DataStream<EventData> s = env.addSource(new ParallelTestSource<>(...)); > when using the testing source [1] > > Does that sound right? > > [1] > https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26 > > <https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26> > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kkrugler_li...@transpac.com > <mailto:kkrugler_li...@transpac.com>> wrote: > Hi Piotr, > > The way I handle this is via a workflow class that uses a builder approach to > specifying inputs, outputs, and any other configuration settings. > > The inputs are typically DataStream<xxx>. > > This way I can separate out the Kafka inputs, and use testing sources that > give me very precise control over the inputs (e.g. I can hold up on right > side data to ensure my stateful left join junction is handling deferred joins > properly). I can also use Kafka unit test support (either kafka-junit or > Spring embedded Kafka) if needed. > > Then in the actual tool class (with a main method) I’ll wire up the real > Kafka sources, with whatever logic is required to convert the consumer > records to what the workflow is expecting. > > — Ken > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com >> <mailto:pi...@domagalski.com>> wrote: >> >> Hi, >> >> I'm wondering: what ithe recommended way to structure the job which one >> would like to test later on with `MiniCluster`. >> >> I've looked at the flink-training repository examples [1] and they tend to >> expose the main job as a class that accepts a `SourceFunction` and a >> `SinkFunction`, which make sense. But then, my job is normally constructed >> with `KafkaSource` which is then passed to `env.fromSource(...`. >> >> Is there any recommended way of handling these discrepancies, ie. having to >> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`? >> >> [1] >> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61 >> >> <https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61> >> >> -- >> Piotr Domagalski > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > > > > -- > Piotr Domagalski -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch