Any help here? How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <pomperma...@okkam.it> wrote: > In the logs I see that the jar is the classpath (I'm trying to debug the > program from the IDE)..isn'it? > > Classpath: [file:/tmp/job-bundle.jar] > ... > > Best, > Flavio > > On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <ches...@apache.org> > wrote: > >> * your JobExecutor is _not_ putting it on the classpath. >> >> On 10/27/2020 10:36 AM, Chesnay Schepler wrote: >> >> Well it happens on the client before you even hit the RestClusterClient, >> so I assume that either your jar is not packaged correctly or you your >> JobExecutor is putting it on the classpath. >> >> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote: >> >> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main >> class I'm trying to use as a client towards the Flink cluster - session >> mode). >> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar). >> >> The code of getBatchEnv is: >> >> @Deprecated >> public static BatchEnv getBatchEnv() { >> // TODO use the following when ready to convert from/to datastream >> // return >> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build()); >> ExecutionEnvironment env = >> ExecutionEnvironment.getExecutionEnvironment(); >> BatchTableEnvironment ret = BatchTableEnvironment.create(env); >> customizeEnv(ret); >> return new BatchEnv(env, ret); >> } >> >> private static void customizeEnv(TableEnvironment ret) { >> final Configuration conf = ret.getConfig().getConfiguration(); >> // >> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, >> 2); >> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR); >> conf.setString(BlobServerOptions.STORAGE_DIRECTORY, >> FLINK_TEST_TMP_DIR); >> // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); >> //NOSONAR >> // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, >> 0.4f);//NOSONAR >> // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 >> * 2);//NOSONAR >> // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * >> 2);// NOSONAR >> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// >> NOSONAR >> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR >> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR >> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR >> conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// >> NOSONAR >> final List<String> kryoSerializers = new ArrayList<>(); >> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, >> JodaDateTimeSerializer.class)); >> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, >> TBaseSerializer.class)); >> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, >> TBaseSerializer.class)); >> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers); >> >> } >> >> Classpath: [file:/tmp/job-bundle.jar] >> >> System.out: (none) >> >> System.err: (none) >> at >> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245) >> at >> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164) >> at >> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77) >> at >> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109) >> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42) >> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb >> at >> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116) >> at >> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95) >> at >> it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73) >> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) >> at >> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150) >> ... 3 more >> Caused by: java.lang.ClassNotFoundException: it/test/MyOb >> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) >> at >> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) >> at >> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >> at >> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >> ... 13 more >> >> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> Hi Flavio, >>> can you share the full stacktrace you are seeing? I'm wondering if the >>> error happens on the client or server side (among other questions I have). >>> >>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Hi to all, >>>> I was trying to use the RestClusterClient to submit my job to the Flink >>>> cluster. >>>> However when I submit the job Flink cannot find the classes contained >>>> in the "fat" jar..what should I do? Am I missing something in my code? >>>> This is the current client code I'm testing: >>>> >>>> public static void main(String[] args) throws MalformedURLException { >>>> final Configuration flinkConf = new Configuration(); >>>> flinkConf.set(RestOptions.ADDRESS, "localhost"); >>>> flinkConf.set(RestOptions.PORT, 8081); >>>> >>>> final File jarFile = new File("/tmp/job-bundle.jar"); >>>> final String jobClass = "it.flink.MyJob"; >>>> >>>> try { >>>> final RestClusterClient<StandaloneClusterId> client = >>>> new RestClusterClient<>(flinkConf, >>>> StandaloneClusterId.getInstance()); >>>> >>>> final PackagedProgram packagedProgram = >>>> PackagedProgram.newBuilder()// >>>> .setJarFile(jarFile)// >>>> // .setUserClassPaths(userClassPaths) >>>> >>>> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)// >>>> .build(); >>>> >>>> final JobGraph jobGraph = >>>> PackagedProgramUtils.createJobGraph(packagedProgram, >>>> flinkConf, 1, true); >>>> >>>> final DetachedJobExecutionResult jobExecutionResult = >>>> >>>> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get(); >>>> >>>> System.out.println(jobExecutionResult.getJobID()); >>>> } catch (Exception ex) { >>>> ex.printStackTrace(); >>>> System.exit(1); >>>> } >>>> } >>>> >>>> Best, >>>> Flavio >>>> >>> >> >>