Hi all 
   I want to test to submit a job from my local IDE and I deployed a Flink 
cluster in my vm. 
   Here is my code from Flink 1.9 document and add some of my parameters.
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment
.createRemoteEnvironment("localhost", 8081, "/tmp/myudf.jar");

DataSet<String> data = env.readTextFile("/tmp/file");

data
.filter(new FilterFunction<String>() {
public boolean filter(String value) {
return value.startsWith("http://";);
                }
            })
.writeAsText("/tmp/file1");

env.execute();
    }

When I run the program, I raises the error like: 


Exception in thread "main" 
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 
1f32190552e955bb2048c31930edfb0e)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:301)
at 
org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:209)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:186)
at 
org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:173)
at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
at TestMain.main(TestMain.java:25)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 8 more
Caused by: java.lang.RuntimeException: The initialization of the DataSource's 
outputs caused an error: Could not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:109)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could 
not read the user code wrapper: TestMain$1
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290)
at 
org.apache.flink.runtime.operators.BatchTask.instantiateUserCode(BatchTask.java:1448)
at 
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39)
at 
org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1315)
at 
org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:317)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:106)
... 3 more
Caused by: java.lang.ClassNotFoundException: TestMain$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:69)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at 
org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
... 9 more


My understanding is that when using remote environment, I don’t need to upload 
my program jar to flink cluster. So can anyone help me for this issue ?


Thanks,
SImon

Reply via email to