Yes, it is definitely way easier to upload&run jars instead of submitting JobGraphs.

But I thought this was not viable for you because you cannot execute anything after env.execute()? I believe this limitation still exists. Or are you referring here to error-handling in case env.execute() throws an exception (which I think should work)?

Finally, I have to point that I already entertained the possibility of the jar not being packaged correctly twice in this thread, 2 and 3 days ago respectively. We could've saved some time here had you checked whether the jar actually contains the class.

On 10/30/2020 12:24 PM, Flavio Pompermaier wrote:
I just discovered that I was using the "slim" jar instead of the "fat" one...sorry. Now I'm able to successfully run the program on the remote cluster. However, the fact of generating the job graph on the client side it's something I really don't like at allbecause it requires access both to flink jdist and env variables (such as hadoop ones) and the user jar, which is not really neat to me. But if I understood correctly in Flink 1.12 I'll be able to use only the Job Manager REST API to run the job (since the limitation of job submission failure could be handled), is it correct?

Thanks for the support,
Flavio

On Fri, Oct 30, 2020 at 11:37 AM Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

    Can you give me more information on your packaging setup / project
    structure? Is "it.test.MyOb" a test class? Does the dependency
    containing this class have a "test" scope?

    On 10/30/2020 11:34 AM, Chesnay Schepler wrote:
    It is irrelevant whether it contains transitive dependencies or
    not; that's a maven concept, not a classloading one.

    The WordCount main class, which is only contained in that jar,
    could be found, so the classloading is working. If any other
    class that is supposed to be in jar cannot be found, then that
    means class is either not in the jar, or some other transitive
    dependency is missing from the jar. (class loading exceptions can
    a bit misleading at times, particularly when accessing transitive
    dependencies in static fields IIRC).

    > Actually I was able to use the REST API without creating the
    JobGraph

    I'm not debating that, and pointed that out myself.
    > [without a job graph you] cannot use the REST API *(outside of
    uploading jars)*

    On 10/30/2020 11:22 AM, Flavio Pompermaier wrote:
    Yes, with the WordCount it works but that jar is not a "fat" jar
    (it does not include transitive dependencies).
    Actually I was able to use the REST API without creating the
    JobGraph, you just have to tell the run API the jar id, the main
    cluss to run and the optional parameters.
    For this don't use any Flink official client, I use the Spring
    REST template and I've implemented the call of the services by
    myself.

    On Fri, Oct 30, 2020 at 11:12 AM Chesnay Schepler
    <ches...@apache.org <mailto:ches...@apache.org>> wrote:

        If you aren't setting up the classpath correctly then you
        cannot create a JobGraph, and cannot use the REST API
        (outside of uploading jars).
        In other words, you _will_ have to solve this issue, one way
        or another.

        FYI, I just tried your code to submit a WordCount jar to a
        cluster (the one in the distribution), and it worked
        flawlessly. Please triple check your packaging and class
        references.

        On 10/30/2020 10:48 AM, Flavio Pompermaier wrote:
        For "REST only client" I mean using only the REST API to
        interact with the Flink cluster, i.e. without creating any
        PackagedProgram and thus incurring into classpath problems.
        I've implemented a running job server that was using the
        REST API to upload the job jar and execute the run command
        but then I gave up because I was not able to run any code
        after env.execute..so I ended up using SSH to the remote
        server and using the CLI client. This has the limitation of
        not being able to get the job id and monitor the job
        status or get back exceptions when deploying the job.
        So now I was trying to explore this new way of submitting
        the job (that computes the jobGraph on the client side and
        submit it to the cluster).



        On Fri, Oct 30, 2020 at 10:32 AM Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>> wrote:

            To clarify, if the job creation fails on the JM side,
            in 1.11 the job submission will fail, in 1.12 it will
            succeed but the job will be in a failed state.

            On 10/30/2020 10:23 AM, Chesnay Schepler wrote:
            1) the job still reaches a failed state, which you can
            poll for, see 2)
            2) polling is your only way.

            What do you mean with "REST only client"? Do you mean
            a plain http client, not something that Flink provides?

            On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
            Nothing to do also with IntelliJ..do you have any
            sample project I can reuse to test the job
            submission to a cluster?
            I can't really understand why the classes within the
            fat jar are not found when generating the
            PackagedProgram.
            Ideally, I'd prefer to use REST only client (so no
            need to build package programs and introduce
            classpath problems..) but I have 2 questions:

              * I remember that when submitting jobs from REST
                there's no way to detect failures in the job
                creation (like missing classes, classpath
                problems, etc). Am I wrong?
              * I'd like to monitor the progress of my batch job
                (for example I can count the number of completed
                vertices wrt the total count of vertices). Is
                there any suggested way to do that apart from
                polling?

            Best,
            Flavio

            On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier
            <pomperma...@okkam.it <mailto:pomperma...@okkam.it>>
            wrote:

                I'm runnin the code from Eclipse, the jar exists
                and it contains the classes Flink is not
                finding..maybe I can try to use IntelliJ in the
                afternoon

                On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler
                <ches...@apache.org <mailto:ches...@apache.org>>
                wrote:

                    @Kostas: Ah, I missed that.

                    @Flavio: the only alternative I can think
                    your jar does not contain the
                    classes, or does not exist at all on the
                    machine your application is run on.

                    On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
                    > Hi all,
                    >
                    > I will have a look in the whole stack trace
                    in a bit.
                    >
                    > @Chesnay Schepler I think that we are
                    setting the correct classloader
                    > during jobgraph creation [1]. Is that what
                    you mean?
                    >
                    > Cheers,
                    > Kostas
                    >
                    > [1]
                    
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
                    >
                    > On Wed, Oct 28, 2020 at 11:02 AM Flavio
                    Pompermaier
                    > <pomperma...@okkam.it
                    <mailto:pomperma...@okkam.it>> wrote:
                    >> Always the same problem.
                    >>
                    >> Caused by:
                    java.lang.ClassNotFoundException: it.test.XXX
                    >> at java.base/java.net
                    
<http://java.net>.URLClassLoader.findClass(URLClassLoader.java:471)
                    >> at
                    
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
                    >> at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
                    >> at
                    
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
                    >> at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
                    >> at
                    
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
                    >> ... 10 more
                    >>
                    >> I've also tried with
                    >>
                    >>
                    flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
                    "parent-first");
                    >>
                    >> but nothing changes.
                    >>
                    >> On Wed, Oct 28, 2020 at 10:34 AM Chesnay
                    Schepler <ches...@apache.org
                    <mailto:ches...@apache.org>> wrote:
                    >>> hmm..it appears as if
                    PackagedProgramUtils#createJobGraph does some
                    things outside the usercode classlodaer
                    (getPipelineFromProgram()), specifically the
                    call to the main method.
                    >>>
                    >>> @klou This seems like wrong behavior?
                    >>>
                    >>> @Flavio What you could try in the
                    meantime is wrap the call to createJobGraph
                    like this:
                    >>>
                    >>> final ClassLoader contextClassLoader =
                    Thread.currentThread().getContextClassLoader();
                    >>> try {
                    >>>
                     
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
                    >>>     // do tstuff
                    >>> } finally {
                    >>>
                     
Thread.currentThread().setContextClassLoader(contextClassLoader);
                    >>> }
                    >>>
                    >>>
                    >>> On 10/28/2020 10:12 AM, Flavio
                    Pompermaier wrote:
                    >>>
                    >>> Any help here?  How can I understand why
                    the classes inside the jar are not found when
                    creating the PackagedProgram?
                    >>>
                    >>> On Tue, Oct 27, 2020 at 11:04 AM Flavio
                    Pompermaier <pomperma...@okkam.it
                    <mailto:pomperma...@okkam.it>> wrote:
                    >>>> In the logs I see that the jar is the
                    classpath (I'm trying to debug the program
                    from the IDE)..isn'it?
                    >>>>
                    >>>> Classpath: [file:/tmp/job-bundle.jar]
                    >>>> ...
                    >>>>
                    >>>> Best,
                    >>>> Flavio
                    >>>>
                    >>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay
                    Schepler <ches...@apache.org
                    <mailto:ches...@apache.org>> wrote:
                    >>>>> * your JobExecutor is _not_ putting it
                    on the classpath.
                    >>>>>
                    >>>>> On 10/27/2020 10:36 AM, Chesnay
                    Schepler wrote:
                    >>>>>
                    >>>>> Well it happens on the client before
                    you even hit the RestClusterClient, so I
                    assume that either your jar is not packaged
                    correctly or you your JobExecutor is putting
                    it on the classpath.
                    >>>>>
                    >>>>> On 10/27/2020 9:42 AM, Flavio
                    Pompermaier wrote:
                    >>>>>
                    >>>>> Sure. Here it is
                    (org.apache.flink.client.cli.JobExecutor is
                    my main class I'm trying to use as a client
                    towards the Flink cluster - session mode).
                    >>>>> it/test/MyOb is within the fat jar
                    (/tmp/job-bundle.jar).
                    >>>>>
                    >>>>> The code of getBatchEnv is:
                    >>>>>
                    >>>>> @Deprecated
                    >>>>>    public static BatchEnv getBatchEnv() {
                    >>>>>      // TODO use the following when
                    ready to convert from/to datastream
                    >>>>>      // return
                    
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
                    >>>>>      ExecutionEnvironment env =
                    ExecutionEnvironment.getExecutionEnvironment();
                    >>>>>      BatchTableEnvironment ret =
                    BatchTableEnvironment.create(env);
                    >>>>>      customizeEnv(ret);
                    >>>>>      return new BatchEnv(env, ret);
                    >>>>>    }
                    >>>>>
                    >>>>>    private static void
                    customizeEnv(TableEnvironment ret) {
                    >>>>>      final Configuration conf =
                    ret.getConfig().getConfiguration();
                    >>>>>      //
                    
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
                    2);
                    >>>>>     
                    conf.setString(CoreOptions.TMP_DIRS,
                    FLINK_TEST_TMP_DIR);
                    >>>>>
                    conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
                    FLINK_TEST_TMP_DIR);
                    >>>>>      //
                    conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
                    4); //NOSONAR
                    >>>>>      //
                    
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
                    0.4f);//NOSONAR
                    >>>>>      //
                    conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
                    32768 * 2);//NOSONAR
                    >>>>>      //
                    conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
                    32768 * 2);// NOSONAR
                    >>>>>
                    conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
                    0);// NOSONAR
                    >>>>>     
                    conf.setString(AkkaOptions.ASK_TIMEOUT, "10
                    min");// NOSONAR
                    >>>>>     
                    conf.setString(AkkaOptions.TCP_TIMEOUT, "10
                    min");// NOSONAR
                    >>>>>     
                    conf.setString(AkkaOptions.STARTUP_TIMEOUT,
                    "10 min");// NOSONAR
                    >>>>>     
                    conf.set(ClientOptions.CLIENT_TIMEOUT,
                    Duration.ofMinutes(10));// NOSONAR
                    >>>>>      final List<String> kryoSerializers
                    = new ArrayList<>();
                    >>>>>
                    
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
                    JodaDateTimeSerializer.class));
                    >>>>>
                    
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
                    TBaseSerializer.class));
                    >>>>>
                    
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
                    TBaseSerializer.class));
                    >>>>>
                    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
                    kryoSerializers);
                    >>>>>
                    >>>>>    }
                    >>>>>
                    >>>>> Classpath: [file:/tmp/job-bundle.jar]
                    >>>>>
                    >>>>> System.out: (none)
                    >>>>>
                    >>>>> System.err: (none)
                    >>>>> at
                    
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
                    >>>>> at
                    
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
                    >>>>> at
                    
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
                    >>>>> at
                    
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
                    >>>>> at
                    
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
                    >>>>> Caused by:
                    java.lang.NoClassDefFoundError: it/test/MyOb
                    >>>>> at
                    
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
                    >>>>> at
                    
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
                    >>>>> at it.okkam.datalinks.flink.jobs
                    
<http://it.okkam.datalinks.flink.jobs>.EnsReconciliator.main(EnsReconciliator.java:73)
                    >>>>> at
                    
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
                    Method)
                    >>>>> at
                    
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                    >>>>> at
                    
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                    >>>>> at
                    java.base/java.lang.reflect.Method.invoke(Method.java:566)
                    >>>>> at
                    
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
                    >>>>> at
                    
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
                    >>>>> at
                    
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
                    >>>>> ... 3 more
                    >>>>> Caused by:
                    java.lang.ClassNotFoundException: it/test/MyOb
                    >>>>> at java.base/java.net
                    
<http://java.net>.URLClassLoader.findClass(URLClassLoader.java:471)
                    >>>>> at
                    
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
                    >>>>> at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
                    >>>>> at
                    
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
                    >>>>> at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
                    >>>>> at
                    
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
                    >>>>> ... 13 more
                    >>>>>
                    >>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert
                    Metzger <rmetz...@apache.org
                    <mailto:rmetz...@apache.org>> wrote:
                    >>>>>> Hi Flavio,
                    >>>>>> can you share the full stacktrace you
                    are seeing? I'm wondering if the error
                    happens on the client or server side (among
                    other questions I have).
                    >>>>>>
                    >>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio
                    Pompermaier <pomperma...@okkam.it
                    <mailto:pomperma...@okkam.it>> wrote:
                    >>>>>>> Hi to all,
                    >>>>>>> I was trying to use the
                    RestClusterClient to submit my job to the
                    Flink cluster.
                    >>>>>>> However when I submit the job Flink
                    cannot find the classes contained in the
                    "fat" jar..what should I do? Am I missing
                    something in my code?
                    >>>>>>> This is the current client code I'm
                    testing:
                    >>>>>>>
                    >>>>>>> public static void main(String[]
                    args) throws MalformedURLException {
                    >>>>>>>      final Configuration flinkConf =
                    new Configuration();
                    >>>>>>>     
                    flinkConf.set(RestOptions.ADDRESS, "localhost");
                    >>>>>>>      flinkConf.set(RestOptions.PORT,
                    8081);
                    >>>>>>>
                    >>>>>>>      final File jarFile = new
                    File("/tmp/job-bundle.jar");
                    >>>>>>>      final String jobClass =
                    "it.flink.MyJob";
                    >>>>>>>
                    >>>>>>>      try {
                    >>>>>>>        final
                    RestClusterClient<StandaloneClusterId> client =
                    >>>>>>>            new
                    RestClusterClient<>(flinkConf,
                    StandaloneClusterId.getInstance());
                    >>>>>>>
                    >>>>>>>        final PackagedProgram
                    packagedProgram = PackagedProgram.newBuilder()//
                    >>>>>>>            .setJarFile(jarFile)//
                    >>>>>>>            //
                    .setUserClassPaths(userClassPaths)
                    >>>>>>>
                    
.setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
                    >>>>>>>            .build();
                    >>>>>>>
                    >>>>>>>        final JobGraph jobGraph =
                    >>>>>>>
                    PackagedProgramUtils.createJobGraph(packagedProgram,
                    flinkConf, 1, true);
                    >>>>>>>
                    >>>>>>>        final
                    DetachedJobExecutionResult jobExecutionResult =
                    >>>>>>>
                    
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
                    >>>>>>>
                    >>>>>>>
                    System.out.println(jobExecutionResult.getJobID());
                    >>>>>>>      } catch (Exception ex) {
                    >>>>>>>        ex.printStackTrace();
                    >>>>>>>        System.exit(1);
                    >>>>>>>      }
                    >>>>>>> }
                    >>>>>>>
                    >>>>>>> Best,
                    >>>>>>> Flavio
                    >>>>>
                    >>>>>







Reply via email to