In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
Classpath: [file:/tmp/job-bundle.jar] ... Best, Flavio On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <ches...@apache.org> wrote: > * your JobExecutor is _not_ putting it on the classpath. > > On 10/27/2020 10:36 AM, Chesnay Schepler wrote: > > Well it happens on the client before you even hit the RestClusterClient, > so I assume that either your jar is not packaged correctly or you your > JobExecutor is putting it on the classpath. > > On 10/27/2020 9:42 AM, Flavio Pompermaier wrote: > > Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class > I'm trying to use as a client towards the Flink cluster - session mode). > it/test/MyOb is within the fat jar (/tmp/job-bundle.jar). > > The code of getBatchEnv is: > > @Deprecated > public static BatchEnv getBatchEnv() { > // TODO use the following when ready to convert from/to datastream > // return > getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build()); > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > BatchTableEnvironment ret = BatchTableEnvironment.create(env); > customizeEnv(ret); > return new BatchEnv(env, ret); > } > > private static void customizeEnv(TableEnvironment ret) { > final Configuration conf = ret.getConfig().getConfiguration(); > // > conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, > 2); > conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR); > conf.setString(BlobServerOptions.STORAGE_DIRECTORY, > FLINK_TEST_TMP_DIR); > // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); > //NOSONAR > // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, > 0.4f);//NOSONAR > // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * > 2);//NOSONAR > // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * > 2);// NOSONAR > conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// > NOSONAR > conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR > conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR > conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR > conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// > NOSONAR > final List<String> kryoSerializers = new ArrayList<>(); > kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, > JodaDateTimeSerializer.class)); > kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, > TBaseSerializer.class)); > kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, > TBaseSerializer.class)); > conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers); > > } > > Classpath: [file:/tmp/job-bundle.jar] > > System.out: (none) > > System.err: (none) > at > org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245) > at > org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109) > at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42) > Caused by: java.lang.NoClassDefFoundError: it/test/MyOb > at > it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116) > at > it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95) > at > it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at > org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150) > ... 3 more > Caused by: java.lang.ClassNotFoundException: it/test/MyOb > at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) > at > org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) > at > org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > ... 13 more > > On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Flavio, >> can you share the full stacktrace you are seeing? I'm wondering if the >> error happens on the client or server side (among other questions I have). >> >> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> Hi to all, >>> I was trying to use the RestClusterClient to submit my job to the Flink >>> cluster. >>> However when I submit the job Flink cannot find the classes contained in >>> the "fat" jar..what should I do? Am I missing something in my code? >>> This is the current client code I'm testing: >>> >>> public static void main(String[] args) throws MalformedURLException { >>> final Configuration flinkConf = new Configuration(); >>> flinkConf.set(RestOptions.ADDRESS, "localhost"); >>> flinkConf.set(RestOptions.PORT, 8081); >>> >>> final File jarFile = new File("/tmp/job-bundle.jar"); >>> final String jobClass = "it.flink.MyJob"; >>> >>> try { >>> final RestClusterClient<StandaloneClusterId> client = >>> new RestClusterClient<>(flinkConf, >>> StandaloneClusterId.getInstance()); >>> >>> final PackagedProgram packagedProgram = >>> PackagedProgram.newBuilder()// >>> .setJarFile(jarFile)// >>> // .setUserClassPaths(userClassPaths) >>> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)// >>> .build(); >>> >>> final JobGraph jobGraph = >>> PackagedProgramUtils.createJobGraph(packagedProgram, >>> flinkConf, 1, true); >>> >>> final DetachedJobExecutionResult jobExecutionResult = >>> >>> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get(); >>> >>> System.out.println(jobExecutionResult.getJobID()); >>> } catch (Exception ex) { >>> ex.printStackTrace(); >>> System.exit(1); >>> } >>> } >>> >>> Best, >>> Flavio >>> >> > >