Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);     // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR     // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR     // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR     // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
    final List<String> kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class)); kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class)); kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116) at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95) at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <rmetz...@apache.org <mailto:rmetz...@apache.org>> wrote:

    Hi Flavio,
    can you share the full stacktrace you are seeing? I'm wondering if
    the error happens on the client or server side (among other
    questions I have).

    On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier
    <pomperma...@okkam.it <mailto:pomperma...@okkam.it>> wrote:

        Hi to all,
        I was trying to use the RestClusterClient to submit my job to
        the Flink cluster.
        However when I submit the job Flink cannot find the classes
        contained in the "fat" jar..what should I do? Am I missing
        something in my code?
        This is the current client code I'm testing:

        public static void main(String[] args) throws
        MalformedURLException {
            final Configuration flinkConf = new Configuration();
            flinkConf.set(RestOptions.ADDRESS, "localhost");
            flinkConf.set(RestOptions.PORT, 8081);

            final File jarFile = new File("/tmp/job-bundle.jar");
            final String jobClass = "it.flink.MyJob";

            try {
              final RestClusterClient<StandaloneClusterId> client =
                  new RestClusterClient<>(flinkConf,
        StandaloneClusterId.getInstance());

              final PackagedProgram packagedProgram =
        PackagedProgram.newBuilder()//
                  .setJarFile(jarFile)//
                  // .setUserClassPaths(userClassPaths)
        .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
                  .build();

              final JobGraph jobGraph =
        PackagedProgramUtils.createJobGraph(packagedProgram,
        flinkConf, 1, true);

              final DetachedJobExecutionResult jobExecutionResult =
        
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

        System.out.println(jobExecutionResult.getJobID());
            } catch (Exception ex) {
              ex.printStackTrace();
              System.exit(1);
            }
        }

        Best,
        Flavio


Reply via email to