Hi Piotr, The way I handle this is via a workflow class that uses a builder approach to specifying inputs, outputs, and any other configuration settings.
The inputs are typically DataStream<xxx>. This way I can separate out the Kafka inputs, and use testing sources that give me very precise control over the inputs (e.g. I can hold up on right side data to ensure my stateful left join junction is handling deferred joins properly). I can also use Kafka unit test support (either kafka-junit or Spring embedded Kafka) if needed. Then in the actual tool class (with a main method) I’ll wire up the real Kafka sources, with whatever logic is required to convert the consumer records to what the workflow is expecting. — Ken > On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> wrote: > > Hi, > > I'm wondering: what ithe recommended way to structure the job which one would > like to test later on with `MiniCluster`. > > I've looked at the flink-training repository examples [1] and they tend to > expose the main job as a class that accepts a `SourceFunction` and a > `SinkFunction`, which make sense. But then, my job is normally constructed > with `KafkaSource` which is then passed to `env.fromSource(...`. > > Is there any recommended way of handling these discrepancies, ie. having to > use `env.addSource(sourceFunction)` vs `env.fromSource(source)`? > > [1] > https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61 > > <https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61> > > -- > Piotr Domagalski -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch