Always the same problem.

Caused by: java.lang.ClassNotFoundException: it.test.XXX
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more

I've also tried with

    flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

but nothing changes.

On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <ches...@apache.org>
wrote:

> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things
> outside the usercode classlodaer (getPipelineFromProgram()), specifically
> the call to the main method.
>
> @klou This seems like wrong behavior?
>
> @Flavio What you could try in the meantime is wrap the call to
> createJobGraph like this:
>
> final ClassLoader contextClassLoader = 
> Thread.currentThread().getContextClassLoader();try {
>    
> Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>    // do tstuff} finally {
>    Thread.currentThread().setContextClassLoader(contextClassLoader);}
>
>
> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>
> Any help here?  How can I understand why the classes inside the jar are
> not found when creating the PackagedProgram?
>
> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> In the logs I see that the jar is the classpath (I'm trying to debug the
>> program from the IDE)..isn'it?
>>
>> Classpath: [file:/tmp/job-bundle.jar]
>> ...
>>
>> Best,
>> Flavio
>>
>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <ches...@apache.org>
>> wrote:
>>
>>> * your JobExecutor is _not_ putting it on the classpath.
>>>
>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>
>>> Well it happens on the client before you even hit the RestClusterClient,
>>> so I assume that either your jar is not packaged correctly or you your
>>> JobExecutor is putting it on the classpath.
>>>
>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>
>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main
>>> class I'm trying to use as a client towards the Flink cluster - session
>>> mode).
>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>
>>> The code of getBatchEnv is:
>>>
>>> @Deprecated
>>>   public static BatchEnv getBatchEnv() {
>>>     // TODO use the following when ready to convert from/to datastream
>>>     // return
>>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>     ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>     BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>     customizeEnv(ret);
>>>     return new BatchEnv(env, ret);
>>>   }
>>>
>>>   private static void customizeEnv(TableEnvironment ret) {
>>>     final Configuration conf = ret.getConfig().getConfiguration();
>>>     //
>>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
>>> 2);
>>>     conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>     conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
>>> FLINK_TEST_TMP_DIR);
>>>     // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>>> //NOSONAR
>>>     // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>>> 0.4f);//NOSONAR
>>>     // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768
>>> * 2);//NOSONAR
>>>     // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768
>>> * 2);// NOSONAR
>>>     conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);//
>>> NOSONAR
>>>     conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>     conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>     conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>     conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));//
>>> NOSONAR
>>>     final List<String> kryoSerializers = new ArrayList<>();
>>>     kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
>>> JodaDateTimeSerializer.class));
>>>     kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
>>> TBaseSerializer.class));
>>>     kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
>>> TBaseSerializer.class));
>>>     conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>
>>>   }
>>>
>>> Classpath: [file:/tmp/job-bundle.jar]
>>>
>>> System.out: (none)
>>>
>>> System.err: (none)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>> at
>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>> at
>>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>> at
>>> it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>> at
>>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>> ... 3 more
>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>> at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>> at
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>> ... 13 more
>>>
>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>> can you share the full stacktrace you are seeing? I'm wondering if the
>>>> error happens on the client or server side (among other questions I have).
>>>>
>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Hi to all,
>>>>> I was trying to use the RestClusterClient to submit my job to the
>>>>> Flink cluster.
>>>>> However when I submit the job Flink cannot find the classes contained
>>>>> in the "fat" jar..what should I do? Am I missing something in my code?
>>>>> This is the current client code I'm testing:
>>>>>
>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>     final Configuration flinkConf = new Configuration();
>>>>>     flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>     flinkConf.set(RestOptions.PORT, 8081);
>>>>>
>>>>>     final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>     final String jobClass = "it.flink.MyJob";
>>>>>
>>>>>     try {
>>>>>       final RestClusterClient<StandaloneClusterId> client =
>>>>>           new RestClusterClient<>(flinkConf,
>>>>> StandaloneClusterId.getInstance());
>>>>>
>>>>>       final PackagedProgram packagedProgram =
>>>>> PackagedProgram.newBuilder()//
>>>>>           .setJarFile(jarFile)//
>>>>>           // .setUserClassPaths(userClassPaths)
>>>>>
>>>>> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>           .build();
>>>>>
>>>>>       final JobGraph jobGraph =
>>>>>           PackagedProgramUtils.createJobGraph(packagedProgram,
>>>>> flinkConf, 1, true);
>>>>>
>>>>>       final DetachedJobExecutionResult jobExecutionResult =
>>>>>
>>>>> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>
>>>>>       System.out.println(jobExecutionResult.getJobID());
>>>>>     } catch (Exception ex) {
>>>>>       ex.printStackTrace();
>>>>>       System.exit(1);
>>>>>     }
>>>>> }
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>
>>>
>

Reply via email to