Any help here?  How can I understand why the classes inside the jar are not
found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> In the logs I see that the jar is the classpath (I'm trying to debug the
> program from the IDE)..isn'it?
>
> Classpath: [file:/tmp/job-bundle.jar]
> ...
>
> Best,
> Flavio
>
> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <ches...@apache.org>
> wrote:
>
>> * your JobExecutor is _not_ putting it on the classpath.
>>
>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>
>> Well it happens on the client before you even hit the RestClusterClient,
>> so I assume that either your jar is not packaged correctly or you your
>> JobExecutor is putting it on the classpath.
>>
>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>
>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main
>> class I'm trying to use as a client towards the Flink cluster - session
>> mode).
>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>
>> The code of getBatchEnv is:
>>
>> @Deprecated
>>   public static BatchEnv getBatchEnv() {
>>     // TODO use the following when ready to convert from/to datastream
>>     // return
>> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>     ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>     BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>     customizeEnv(ret);
>>     return new BatchEnv(env, ret);
>>   }
>>
>>   private static void customizeEnv(TableEnvironment ret) {
>>     final Configuration conf = ret.getConfig().getConfiguration();
>>     //
>> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
>> 2);
>>     conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>     conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
>> FLINK_TEST_TMP_DIR);
>>     // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
>> //NOSONAR
>>     // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
>> 0.4f);//NOSONAR
>>     // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768
>> * 2);//NOSONAR
>>     // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 *
>> 2);// NOSONAR
>>     conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);//
>> NOSONAR
>>     conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>     conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>     conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>     conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));//
>> NOSONAR
>>     final List<String> kryoSerializers = new ArrayList<>();
>>     kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
>> JodaDateTimeSerializer.class));
>>     kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
>> TBaseSerializer.class));
>>     kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
>> TBaseSerializer.class));
>>     conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>
>>   }
>>
>> Classpath: [file:/tmp/job-bundle.jar]
>>
>> System.out: (none)
>>
>> System.err: (none)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>> at
>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>> at
>> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>> at
>> it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> at
>> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>> ... 3 more
>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 13 more
>>
>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hi Flavio,
>>> can you share the full stacktrace you are seeing? I'm wondering if the
>>> error happens on the client or server side (among other questions I have).
>>>
>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Hi to all,
>>>> I was trying to use the RestClusterClient to submit my job to the Flink
>>>> cluster.
>>>> However when I submit the job Flink cannot find the classes contained
>>>> in the "fat" jar..what should I do? Am I missing something in my code?
>>>> This is the current client code I'm testing:
>>>>
>>>> public static void main(String[] args) throws MalformedURLException {
>>>>     final Configuration flinkConf = new Configuration();
>>>>     flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>     flinkConf.set(RestOptions.PORT, 8081);
>>>>
>>>>     final File jarFile = new File("/tmp/job-bundle.jar");
>>>>     final String jobClass = "it.flink.MyJob";
>>>>
>>>>     try {
>>>>       final RestClusterClient<StandaloneClusterId> client =
>>>>           new RestClusterClient<>(flinkConf,
>>>> StandaloneClusterId.getInstance());
>>>>
>>>>       final PackagedProgram packagedProgram =
>>>> PackagedProgram.newBuilder()//
>>>>           .setJarFile(jarFile)//
>>>>           // .setUserClassPaths(userClassPaths)
>>>>
>>>> .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>           .build();
>>>>
>>>>       final JobGraph jobGraph =
>>>>           PackagedProgramUtils.createJobGraph(packagedProgram,
>>>> flinkConf, 1, true);
>>>>
>>>>       final DetachedJobExecutionResult jobExecutionResult =
>>>>
>>>> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>
>>>>       System.out.println(jobExecutionResult.getJobID());
>>>>     } catch (Exception ex) {
>>>>       ex.printStackTrace();
>>>>       System.exit(1);
>>>>     }
>>>> }
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>
>>

Reply via email to