Hey Κωνσταντίνος, check out this sample code we use for testing https://github.com/metarank/metarank . It is in scala, but should be quite straightforward to port to java:
val cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) // here it is .setNumberSlotsPerTaskManager(1) .setConfiguration(conf) .build() ) cluster.before() val client = cluster.getClusterClient val env = new StreamExecutionEnvironment(new TestStreamEnvironment(cluster.getMiniCluster, 1)) // do some stuff with env val graph = env.getStreamGraph.getJobGraph client.submitJob(graph) // here you need to wait until the job is finished // you may use cluster client instance here to poll for completion client.close() cluster.after() For a full working example, check these files: * https://github.com/metarank/metarank/blob/master/src/main/scala/ai/metarank/mode/AsyncFlinkJob.scala * https://github.com/metarank/metarank/blob/master/src/main/scala/ai/metarank/mode/inference/FlinkMinicluster.scala with best regards, Roman Grebennikov | g...@dfdx.me On Sat, May 14, 2022, at 18:45, Αγαπίδης wrote: > Hi list, > > I am using Java 8, Flink 1.15, and IntelliJ. > > I wonder if it is posible to open an additional TaskManager in a Stream > Job in the Java code, to run it in IntelliJ Local Cluster (MiniCluster) > Debug mode. I found this method in the code reference, but I don't know > hot to call it. > > https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html#startTaskManager-- > > I thought that I need to get the current MiniCluster somehow, before > starting the Job Excecution. But how? > > Thank you in advance! > > -- > Best Regards, > Kostas