Nothing to do also with IntelliJ..do you have any sample
project I can reuse to test the job submission to a cluster?
I can't really understand why the classes within the fat
jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need to
build package programs and introduce classpath problems..)
but I have 2 questions:
* I remember that when submitting jobs from REST there's
no way to detect failures in the job creation (like
missing classes, classpath problems, etc). Am I wrong?
* I'd like to monitor the progress of my batch job (for
example I can count the number of completed vertices
wrt the total count of vertices). Is there any
suggested way to do that apart from polling?
Best,
Flavio
On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier
<pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote:
I'm runnin the code from Eclipse, the jar exists and it
contains the classes Flink is not finding..maybe I can
try to use IntelliJ in the afternoon
On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>> wrote:
@Kostas: Ah, I missed that.
@Flavio: the only alternative I can think your jar
does not contain the
classes, or does not exist at all on the machine
your application is run on.
On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the
correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1]
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <pomperma...@okkam.it
<mailto:pomperma...@okkam.it>> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException:
it.test.XXX
>> at java.base/java.net
<http://java.net>.URLClassLoader.findClass(URLClassLoader.java:471)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>
flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
"parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay
Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
>>> hmm..it appears as if
PackagedProgramUtils#createJobGraph does some
things outside the usercode classlodaer
(getPipelineFromProgram()), specifically the call
to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is
wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
>>> try {
>>>
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>> // do tstuff
>>> } finally {
>>>
Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here? How can I understand why the
classes inside the jar are not found when creating
the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio
Pompermaier <pomperma...@okkam.it
<mailto:pomperma...@okkam.it>> wrote:
>>>> In the logs I see that the jar is the
classpath (I'm trying to debug the program from the
IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay
Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
>>>>> * your JobExecutor is _not_ putting it on the
classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even
hit the RestClusterClient, so I assume that either
your jar is not packaged correctly or you your
JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is
(org.apache.flink.client.cli.JobExecutor is my main
class I'm trying to use as a client towards the
Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar
(/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>> public static BatchEnv getBatchEnv() {
>>>>> // TODO use the following when ready to
convert from/to datastream
>>>>> // return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>> ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
>>>>> BatchTableEnvironment ret =
BatchTableEnvironment.create(env);
>>>>> customizeEnv(ret);
>>>>> return new BatchEnv(env, ret);
>>>>> }
>>>>>
>>>>> private static void
customizeEnv(TableEnvironment ret) {
>>>>> final Configuration conf =
ret.getConfig().getConfiguration();
>>>>> //
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
>>>>> conf.setString(CoreOptions.TMP_DIRS,
FLINK_TEST_TMP_DIR);
>>>>>
conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
FLINK_TEST_TMP_DIR);
>>>>> //
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
4); //NOSONAR
>>>>> //
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
32768 * 2);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
32768 * 2);// NOSONAR
>>>>>
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
0);// NOSONAR
>>>>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10
min");// NOSONAR
>>>>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10
min");// NOSONAR
>>>>> conf.setString(AkkaOptions.STARTUP_TIMEOUT,
"10 min");// NOSONAR
>>>>> conf.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofMinutes(10));// NOSONAR
>>>>> final List<String> kryoSerializers = new
ArrayList<>();
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
>>>>>
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
kryoSerializers);
>>>>>
>>>>> }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError:
it/test/MyOb
>>>>> at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs
<http://it.okkam.datalinks.flink.jobs>.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
>>>>> at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException:
it/test/MyOb
>>>>> at java.base/java.net
<http://java.net>.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert
Metzger <rmetz...@apache.org
<mailto:rmetz...@apache.org>> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are
seeing? I'm wondering if the error happens on the
client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio
Pompermaier <pomperma...@okkam.it
<mailto:pomperma...@okkam.it>> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient
to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot
find the classes contained in the "fat" jar..what
should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args)
throws MalformedURLException {
>>>>>>> final Configuration flinkConf = new
Configuration();
>>>>>>> flinkConf.set(RestOptions.ADDRESS,
"localhost");
>>>>>>> flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>> final File jarFile = new
File("/tmp/job-bundle.jar");
>>>>>>> final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>> try {
>>>>>>> final
RestClusterClient<StandaloneClusterId> client =
>>>>>>> new RestClusterClient<>(flinkConf,
StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>> final PackagedProgram packagedProgram =
PackagedProgram.newBuilder()//
>>>>>>> .setJarFile(jarFile)//
>>>>>>> // .setUserClassPaths(userClassPaths)
>>>>>>>
.setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>> .build();
>>>>>>>
>>>>>>> final JobGraph jobGraph =
>>>>>>>
PackagedProgramUtils.createJobGraph(packagedProgram,
flinkConf, 1, true);
>>>>>>>
>>>>>>> final DetachedJobExecutionResult
jobExecutionResult =
>>>>>>>
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>
System.out.println(jobExecutionResult.getJobID());
>>>>>>> } catch (Exception ex) {
>>>>>>> ex.printStackTrace();
>>>>>>> System.exit(1);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>