Nothing to do also with IntelliJ..do you have any
sample project I can reuse to test the job
submission to a cluster?
I can't really understand why the classes within the
fat jar are not found when generating the
PackagedProgram.
Ideally, I'd prefer to use REST only client (so no
need to build package programs and introduce
classpath problems..) but I have 2 questions:
* I remember that when submitting jobs from REST
there's no way to detect failures in the job
creation (like missing classes, classpath
problems, etc). Am I wrong?
* I'd like to monitor the progress of my batch job
(for example I can count the number of completed
vertices wrt the total count of vertices). Is
there any suggested way to do that apart from
polling?
Best,
Flavio
On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier
<pomperma...@okkam.it <mailto:pomperma...@okkam.it>>
wrote:
I'm runnin the code from Eclipse, the jar exists
and it contains the classes Flink is not
finding..maybe I can try to use IntelliJ in the
afternoon
On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler
<ches...@apache.org <mailto:ches...@apache.org>>
wrote:
@Kostas: Ah, I missed that.
@Flavio: the only alternative I can think
your jar does not contain the
classes, or does not exist at all on the
machine your application is run on.
On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace
in a bit.
>
> @Chesnay Schepler I think that we are
setting the correct classloader
> during jobgraph creation [1]. Is that what
you mean?
>
> Cheers,
> Kostas
>
> [1]
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio
Pompermaier
> <pomperma...@okkam.it
<mailto:pomperma...@okkam.it>> wrote:
>> Always the same problem.
>>
>> Caused by:
java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net
<http://java.net>.URLClassLoader.findClass(URLClassLoader.java:471)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>
flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
"parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay
Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
>>> hmm..it appears as if
PackagedProgramUtils#createJobGraph does some
things outside the usercode classlodaer
(getPipelineFromProgram()), specifically the
call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the
meantime is wrap the call to createJobGraph
like this:
>>>
>>> final ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
>>> try {
>>>
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>> // do tstuff
>>> } finally {
>>>
Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio
Pompermaier wrote:
>>>
>>> Any help here? How can I understand why
the classes inside the jar are not found when
creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio
Pompermaier <pomperma...@okkam.it
<mailto:pomperma...@okkam.it>> wrote:
>>>> In the logs I see that the jar is the
classpath (I'm trying to debug the program
from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay
Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
>>>>> * your JobExecutor is _not_ putting it
on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay
Schepler wrote:
>>>>>
>>>>> Well it happens on the client before
you even hit the RestClusterClient, so I
assume that either your jar is not packaged
correctly or you your JobExecutor is putting
it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio
Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is
(org.apache.flink.client.cli.JobExecutor is
my main class I'm trying to use as a client
towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar
(/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>> public static BatchEnv getBatchEnv() {
>>>>> // TODO use the following when
ready to convert from/to datastream
>>>>> // return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>> ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
>>>>> BatchTableEnvironment ret =
BatchTableEnvironment.create(env);
>>>>> customizeEnv(ret);
>>>>> return new BatchEnv(env, ret);
>>>>> }
>>>>>
>>>>> private static void
customizeEnv(TableEnvironment ret) {
>>>>> final Configuration conf =
ret.getConfig().getConfiguration();
>>>>> //
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
>>>>>
conf.setString(CoreOptions.TMP_DIRS,
FLINK_TEST_TMP_DIR);
>>>>>
conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
FLINK_TEST_TMP_DIR);
>>>>> //
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
4); //NOSONAR
>>>>> //
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
32768 * 2);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
32768 * 2);// NOSONAR
>>>>>
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
0);// NOSONAR
>>>>>
conf.setString(AkkaOptions.ASK_TIMEOUT, "10
min");// NOSONAR
>>>>>
conf.setString(AkkaOptions.TCP_TIMEOUT, "10
min");// NOSONAR
>>>>>
conf.setString(AkkaOptions.STARTUP_TIMEOUT,
"10 min");// NOSONAR
>>>>>
conf.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofMinutes(10));// NOSONAR
>>>>> final List<String> kryoSerializers
= new ArrayList<>();
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
>>>>>
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
kryoSerializers);
>>>>>
>>>>> }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by:
java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs
<http://it.okkam.datalinks.flink.jobs>.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
>>>>> at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by:
java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net
<http://java.net>.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert
Metzger <rmetz...@apache.org
<mailto:rmetz...@apache.org>> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you
are seeing? I'm wondering if the error
happens on the client or server side (among
other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio
Pompermaier <pomperma...@okkam.it
<mailto:pomperma...@okkam.it>> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the
RestClusterClient to submit my job to the
Flink cluster.
>>>>>>> However when I submit the job Flink
cannot find the classes contained in the
"fat" jar..what should I do? Am I missing
something in my code?
>>>>>>> This is the current client code I'm
testing:
>>>>>>>
>>>>>>> public static void main(String[]
args) throws MalformedURLException {
>>>>>>> final Configuration flinkConf =
new Configuration();
>>>>>>>
flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>> flinkConf.set(RestOptions.PORT,
8081);
>>>>>>>
>>>>>>> final File jarFile = new
File("/tmp/job-bundle.jar");
>>>>>>> final String jobClass =
"it.flink.MyJob";
>>>>>>>
>>>>>>> try {
>>>>>>> final
RestClusterClient<StandaloneClusterId> client =
>>>>>>> new
RestClusterClient<>(flinkConf,
StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>> final PackagedProgram
packagedProgram = PackagedProgram.newBuilder()//
>>>>>>> .setJarFile(jarFile)//
>>>>>>> //
.setUserClassPaths(userClassPaths)
>>>>>>>
.setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>> .build();
>>>>>>>
>>>>>>> final JobGraph jobGraph =
>>>>>>>
PackagedProgramUtils.createJobGraph(packagedProgram,
flinkConf, 1, true);
>>>>>>>
>>>>>>> final
DetachedJobExecutionResult jobExecutionResult =
>>>>>>>
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>
System.out.println(jobExecutionResult.getJobID());
>>>>>>> } catch (Exception ex) {
>>>>>>> ex.printStackTrace();
>>>>>>> System.exit(1);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>