Hey Κωνσταντίνος,

check out this sample code we use for testing 
https://github.com/metarank/metarank . It is in scala, but should be quite 
straightforward to port to java:

    val cluster = new MiniClusterWithClientResource(
      new MiniClusterResourceConfiguration.Builder()
        .setNumberTaskManagers(1) // here it is
        .setNumberSlotsPerTaskManager(1)
        .setConfiguration(conf)
        .build()
    )
    cluster.before()
    val client = cluster.getClusterClient
    val env = new StreamExecutionEnvironment(new 
TestStreamEnvironment(cluster.getMiniCluster, 1))
    // do some stuff with env
    val graph = env.getStreamGraph.getJobGraph
    client.submitJob(graph)
    // here you need to wait until the job is finished
    // you may use cluster client instance here to poll for completion
    client.close()
    cluster.after()

For a full working example, check these files:
* 
https://github.com/metarank/metarank/blob/master/src/main/scala/ai/metarank/mode/AsyncFlinkJob.scala
* 
https://github.com/metarank/metarank/blob/master/src/main/scala/ai/metarank/mode/inference/FlinkMinicluster.scala

with best regards,
Roman Grebennikov | g...@dfdx.me

On Sat, May 14, 2022, at 18:45,  Αγαπίδης wrote:
> Hi list,
>
> I am using Java 8, Flink 1.15, and IntelliJ.
>
> I wonder if it is posible to open an additional TaskManager in a Stream 
> Job in the Java code, to run it in IntelliJ Local Cluster (MiniCluster) 
> Debug mode. I found this method in the code reference, but I don't know 
> hot to call it.
>
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html#startTaskManager--
>
> I thought that I need to get the current MiniCluster somehow, before 
> starting the Job Excecution. But how?
>
> Thank you in advance!
>
> --
> Best Regards,
> Kostas

Reply via email to