Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode). it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
The code of getBatchEnv is: @Deprecated public static BatchEnv getBatchEnv() { // TODO use the following when ready to convert from/to datastream // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build()); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment ret = BatchTableEnvironment.create(env); customizeEnv(ret); return new BatchEnv(env, ret); } private static void customizeEnv(TableEnvironment ret) { final Configuration conf = ret.getConfig().getConfiguration(); // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR); conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR); // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR final List<String> kryoSerializers = new ArrayList<>(); kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class)); kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class)); kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class)); conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers); } Classpath: [file:/tmp/job-bundle.jar] System.out: (none) System.err: (none) at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109) at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42) Caused by: java.lang.NoClassDefFoundError: it/test/MyOb at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116) at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95) at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150) ... 3 more Caused by: java.lang.ClassNotFoundException: it/test/MyOb at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ... 13 more On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org> wrote: > Hi Flavio, > can you share the full stacktrace you are seeing? I'm wondering if the > error happens on the client or server side (among other questions I have). > > On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Hi to all, >> I was trying to use the RestClusterClient to submit my job to the Flink >> cluster. >> However when I submit the job Flink cannot find the classes contained in >> the "fat" jar..what should I do? Am I missing something in my code? >> This is the current client code I'm testing: >> >> public static void main(String[] args) throws MalformedURLException { >> final Configuration flinkConf = new Configuration(); >> flinkConf.set(RestOptions.ADDRESS, "localhost"); >> flinkConf.set(RestOptions.PORT, 8081); >> >> final File jarFile = new File("/tmp/job-bundle.jar"); >> final String jobClass = "it.flink.MyJob"; >> >> try { >> final RestClusterClient<StandaloneClusterId> client = >> new RestClusterClient<>(flinkConf, >> StandaloneClusterId.getInstance()); >> >> final PackagedProgram packagedProgram = >> PackagedProgram.newBuilder()// >> .setJarFile(jarFile)// >> // .setUserClassPaths(userClassPaths) >> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)// >> .build(); >> >> final JobGraph jobGraph = >> PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, >> 1, true); >> >> final DetachedJobExecutionResult jobExecutionResult = >> >> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get(); >> >> System.out.println(jobExecutionResult.getJobID()); >> } catch (Exception ex) { >> ex.printStackTrace(); >> System.exit(1); >> } >> } >> >> Best, >> Flavio >> >