Always the same problem. Caused by: java.lang.ClassNotFoundException: it.test.XXX at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ... 10 more
I've also tried with flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first"); but nothing changes. On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <ches...@apache.org> wrote: > hmm..it appears as if PackagedProgramUtils#createJobGraph does some things > outside the usercode classlodaer (getPipelineFromProgram()), specifically > the call to the main method. > > @klou This seems like wrong behavior? > > @Flavio What you could try in the meantime is wrap the call to > createJobGraph like this: > > final ClassLoader contextClassLoader = > Thread.currentThread().getContextClassLoader();try { > > Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); > // do tstuff} finally { > Thread.currentThread().setContextClassLoader(contextClassLoader);} > > > On 10/28/2020 10:12 AM, Flavio Pompermaier wrote: > > Any help here? How can I understand why the classes inside the jar are > not found when creating the PackagedProgram? > > On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> In the logs I see that the jar is the classpath (I'm trying to debug the >> program from the IDE)..isn'it? >> >> Classpath: [file:/tmp/job-bundle.jar] >> ... >> >> Best, >> Flavio >> >> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> * your JobExecutor is _not_ putting it on the classpath. >>> >>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote: >>> >>> Well it happens on the client before you even hit the RestClusterClient, >>> so I assume that either your jar is not packaged correctly or you your >>> JobExecutor is putting it on the classpath. >>> >>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote: >>> >>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main >>> class I'm trying to use as a client towards the Flink cluster - session >>> mode). >>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar). >>> >>> The code of getBatchEnv is: >>> >>> @Deprecated >>> public static BatchEnv getBatchEnv() { >>> // TODO use the following when ready to convert from/to datastream >>> // return >>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build()); >>> ExecutionEnvironment env = >>> ExecutionEnvironment.getExecutionEnvironment(); >>> BatchTableEnvironment ret = BatchTableEnvironment.create(env); >>> customizeEnv(ret); >>> return new BatchEnv(env, ret); >>> } >>> >>> private static void customizeEnv(TableEnvironment ret) { >>> final Configuration conf = ret.getConfig().getConfiguration(); >>> // >>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, >>> 2); >>> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR); >>> conf.setString(BlobServerOptions.STORAGE_DIRECTORY, >>> FLINK_TEST_TMP_DIR); >>> // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); >>> //NOSONAR >>> // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, >>> 0.4f);//NOSONAR >>> // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 >>> * 2);//NOSONAR >>> // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 >>> * 2);// NOSONAR >>> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// >>> NOSONAR >>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR >>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR >>> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR >>> conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// >>> NOSONAR >>> final List<String> kryoSerializers = new ArrayList<>(); >>> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, >>> JodaDateTimeSerializer.class)); >>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, >>> TBaseSerializer.class)); >>> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, >>> TBaseSerializer.class)); >>> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers); >>> >>> } >>> >>> Classpath: [file:/tmp/job-bundle.jar] >>> >>> System.out: (none) >>> >>> System.err: (none) >>> at >>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245) >>> at >>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164) >>> at >>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77) >>> at >>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109) >>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42) >>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb >>> at >>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116) >>> at >>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95) >>> at >>> it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >>> at >>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150) >>> ... 3 more >>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb >>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) >>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >>> at >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) >>> at >>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>> at >>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >>> ... 13 more >>> >>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hi Flavio, >>>> can you share the full stacktrace you are seeing? I'm wondering if the >>>> error happens on the client or server side (among other questions I have). >>>> >>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Hi to all, >>>>> I was trying to use the RestClusterClient to submit my job to the >>>>> Flink cluster. >>>>> However when I submit the job Flink cannot find the classes contained >>>>> in the "fat" jar..what should I do? Am I missing something in my code? >>>>> This is the current client code I'm testing: >>>>> >>>>> public static void main(String[] args) throws MalformedURLException { >>>>> final Configuration flinkConf = new Configuration(); >>>>> flinkConf.set(RestOptions.ADDRESS, "localhost"); >>>>> flinkConf.set(RestOptions.PORT, 8081); >>>>> >>>>> final File jarFile = new File("/tmp/job-bundle.jar"); >>>>> final String jobClass = "it.flink.MyJob"; >>>>> >>>>> try { >>>>> final RestClusterClient<StandaloneClusterId> client = >>>>> new RestClusterClient<>(flinkConf, >>>>> StandaloneClusterId.getInstance()); >>>>> >>>>> final PackagedProgram packagedProgram = >>>>> PackagedProgram.newBuilder()// >>>>> .setJarFile(jarFile)// >>>>> // .setUserClassPaths(userClassPaths) >>>>> >>>>> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)// >>>>> .build(); >>>>> >>>>> final JobGraph jobGraph = >>>>> PackagedProgramUtils.createJobGraph(packagedProgram, >>>>> flinkConf, 1, true); >>>>> >>>>> final DetachedJobExecutionResult jobExecutionResult = >>>>> >>>>> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get(); >>>>> >>>>> System.out.println(jobExecutionResult.getJobID()); >>>>> } catch (Exception ex) { >>>>> ex.printStackTrace(); >>>>> System.exit(1); >>>>> } >>>>> } >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>> >>> >>> >