Till Rohrmann created FLINK-7943: ------------------------------------ Summary: OptionalDataException when launching Flink jobs concurrently Key: FLINK-7943 URL: https://issues.apache.org/jira/browse/FLINK-7943 Project: Flink Issue Type: Bug Components: Client Affects Versions: 1.4.0 Reporter: Till Rohrmann
A user reported that he is getting a {{OptionalDataException}} when he launches multiple Flink jobs from the same program concurrently. The problem seems to appear if one sets the {{GlobalJobParameters}}. The stack trace can be found below: {code} Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15) java.io.OptionalDataException at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) at java.util.HashMap.readObject(HashMap.java:1407) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} The user code causing the problem is: {code} @SuppressWarnings("serial") public class UnionThreaded { static int ThreadPoolSize = 3; static int JobsPerThread = 2; static ParameterTool params; public static class RunSubset implements Runnable { private int start = 0; private int end = 0; RunSubset(int start, int end) { this.start = start; this.end = end; } @Override public void run() { // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); if (params.has("left") && params.has("right")) { for (int i = start; i < end; i++) { DataSet<DeviceRecord> l, r; DataSet<DeviceRecord1> j; DataSet<Tuple2<Integer, Integer>> c1, c2; r = env.readCsvFile(params.get("right") + "/" + Integer.toString(i)) .pojoType(DeviceRecord.class, "A", "B", "C") .setParallelism(1) .filter(new MyFilter()) .setParallelism(1); // read the text file from given input path j = env.readCsvFile(params.get("left") + "/" + Integer.toString(i)) .pojoType(DeviceRecord.class, "A", "B", "C") .setParallelism(1) .leftOuterJoin(r) .where("B") .equalTo("B") .with(new MyFlatJoinFunction()).setParallelism(1); j.flatMap(new Mapper(false)) .groupBy(0) .sum(1).setParallelism(1) .writeAsCsv(params.get("output") + "/" + Integer.toString(i), "\n", ","); j.flatMap(new Mapper2(true)) .groupBy(0) .sum(1).setParallelism(1) .writeAsCsv(params.get("output2") + "/" + Integer.toString(i), "\n", ","); } } try { System.out.println("calling env.execute()"); // + Calendar.getInstance().getTime(); env.execute("Union4a" + ":" + Integer.toString(start) + ":" + Integer.toString(end)); } catch (Exception e) { System.err.println("env.execute exception: " + e.getMessage()); } } } // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { params = ParameterTool.fromArgs(args); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); int total_to_do = Integer.decode(params.get("filecount")); // number of threads should be <= number of slots ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(ThreadPoolSize); // assumes an even number of jobs for (int i = 0; i < total_to_do; i += JobsPerThread) { int end = i + JobsPerThread; if (end > total_to_do) { end = total_to_do; } executor.execute(new RunSubset(i, end)); } executor.shutdown(); // Many ways of waiting. try { executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); } catch (InterruptedException e) { System.out.println("Execution interrupted"); System.exit(-1); } // get input data DataSet<Tuple2<Integer, Integer>> counts; DataSet<Tuple2<Integer, Integer>> counts2; counts = env.readCsvFile(params.get("output")) .types(Integer.class, Integer.class); counts2 = env.readCsvFile(params.get("output2")) .types(Integer.class, Integer.class); // Count by C counts = counts .groupBy(0) .sum(1); // Count by device counts2 = counts2 .groupBy(0) .sum(1); // emit result if (params.has("output")) { counts.writeAsCsv(params.get("output3"), "\n", ", "); } // emit result if (params.has("output2")) { counts2.writeAsCsv(params.get("output4"), "\n", ", "); } // execute program env.execute("Union4b"); } {code} [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Execute-multiple-jobs-in-parallel-threading-java-io-OptionalDataException-td16441.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)