[ https://issues.apache.org/jira/browse/FLINK-7943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259320#comment-16259320 ]
ASF GitHub Bot commented on FLINK-7943: --------------------------------------- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4921 > OptionalDataException when launching Flink jobs concurrently > ------------------------------------------------------------ > > Key: FLINK-7943 > URL: https://issues.apache.org/jira/browse/FLINK-7943 > Project: Flink > Issue Type: Bug > Components: Client > Affects Versions: 1.4.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > > A user reported that he is getting a {{OptionalDataException}} when he > launches multiple Flink jobs from the same program concurrently. The problem > seems to appear if one sets the {{GlobalJobParameters}}. The stack trace can > be found below: > {code} > Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15) > java.io.OptionalDataException > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at java.util.HashMap.readObject(HashMap.java:1407) > at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} > The user code causing the problem is: > {code} > @SuppressWarnings("serial") > public class UnionThreaded { > static int ThreadPoolSize = 3; > static int JobsPerThread = 2; > static ParameterTool params; > public static class RunSubset implements Runnable { > private int start = 0; > private int end = 0; > RunSubset(int start, int end) { > this.start = start; > this.end = end; > } > @Override > public void run() { > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > // make parameters available in the web interface > env.getConfig().setGlobalJobParameters(params); > if (params.has("left") && params.has("right")) { > for (int i = start; i < end; i++) { > DataSet<DeviceRecord> l, r; > DataSet<DeviceRecord1> j; > DataSet<Tuple2<Integer, Integer>> c1, c2; > r = env.readCsvFile(params.get("right") + "/" + > Integer.toString(i)) > .pojoType(DeviceRecord.class, "A", "B", "C") > .setParallelism(1) > .filter(new MyFilter()) > .setParallelism(1); > // read the text file from given input path > j = env.readCsvFile(params.get("left") + "/" + > Integer.toString(i)) > .pojoType(DeviceRecord.class, "A", "B", "C") > .setParallelism(1) > .leftOuterJoin(r) > .where("B") > .equalTo("B") > .with(new MyFlatJoinFunction()).setParallelism(1); > j.flatMap(new Mapper(false)) > .groupBy(0) > .sum(1).setParallelism(1) > .writeAsCsv(params.get("output") + "/" + > Integer.toString(i), "\n", ","); > j.flatMap(new Mapper2(true)) > .groupBy(0) > .sum(1).setParallelism(1) > .writeAsCsv(params.get("output2") + "/" + > Integer.toString(i), "\n", ","); > } > } > try { > System.out.println("calling env.execute()"); // + > Calendar.getInstance().getTime(); > env.execute("Union4a" + ":" + Integer.toString(start) + ":" + > Integer.toString(end)); > } catch (Exception e) { > System.err.println("env.execute exception: " + > e.getMessage()); > } > } > } > // > ************************************************************************* > // PROGRAM > // > ************************************************************************* > public static void main(String[] args) throws Exception { > params = ParameterTool.fromArgs(args); > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > // make parameters available in the web interface > env.getConfig().setGlobalJobParameters(params); > int total_to_do = Integer.decode(params.get("filecount")); > // number of threads should be <= number of slots > ThreadPoolExecutor executor = (ThreadPoolExecutor) > Executors.newFixedThreadPool(ThreadPoolSize); > // assumes an even number of jobs > for (int i = 0; i < total_to_do; i += JobsPerThread) { > int end = i + JobsPerThread; > if (end > total_to_do) { > end = total_to_do; > } > executor.execute(new RunSubset(i, end)); > } > executor.shutdown(); > // Many ways of waiting. > try { > executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); > } catch (InterruptedException e) { > System.out.println("Execution interrupted"); > System.exit(-1); > } > // get input data > DataSet<Tuple2<Integer, Integer>> counts; > DataSet<Tuple2<Integer, Integer>> counts2; > counts = env.readCsvFile(params.get("output")) > .types(Integer.class, Integer.class); > counts2 = env.readCsvFile(params.get("output2")) > .types(Integer.class, Integer.class); > // Count by C > counts = counts > .groupBy(0) > .sum(1); > // Count by device > counts2 = counts2 > .groupBy(0) > .sum(1); > // emit result > if (params.has("output")) { > counts.writeAsCsv(params.get("output3"), "\n", ", "); > } > // emit result > if (params.has("output2")) { > counts2.writeAsCsv(params.get("output4"), "\n", ", "); > } > // execute program > env.execute("Union4b"); > } > {code} > [1] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Execute-multiple-jobs-in-parallel-threading-java-io-OptionalDataException-td16441.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)