Hi Piotr,

The way I handle this is via a workflow class that uses a builder approach to 
specifying inputs, outputs, and any other configuration settings.

The inputs are typically DataStream<xxx>.

This way I can separate out the Kafka inputs, and use testing sources that give 
me very precise control over the inputs (e.g. I can hold up on right side data 
to ensure my stateful left join junction is handling deferred joins properly). 
I can also use Kafka unit test support (either kafka-junit or Spring embedded 
Kafka) if needed.

Then in the actual tool class (with a main method) I’ll wire up the real Kafka 
sources, with whatever logic is required to convert the consumer records to 
what the workflow is expecting.

— Ken

> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> wrote:
> 
> Hi,
> 
> I'm wondering: what ithe recommended way to structure the job which one would 
> like to test later on with `MiniCluster`.
> 
> I've looked at the flink-training repository examples [1] and they tend to 
> expose the main job as a class that accepts a `SourceFunction` and a 
> `SinkFunction`, which make sense. But then, my job is normally constructed 
> with `KafkaSource` which is then passed to `env.fromSource(...`.
> 
> Is there any recommended way of handling these discrepancies, ie. having to 
> use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> 
> [1] 
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>  
> <https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61>
> 
> -- 
> Piotr Domagalski

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to