Till Rohrmann created FLINK-7943:
------------------------------------

             Summary: OptionalDataException when launching Flink jobs 
concurrently
                 Key: FLINK-7943
                 URL: https://issues.apache.org/jira/browse/FLINK-7943
             Project: Flink
          Issue Type: Bug
          Components: Client
    Affects Versions: 1.4.0
            Reporter: Till Rohrmann


A user reported that he is getting a {{OptionalDataException}} when he launches 
multiple Flink jobs from the same program concurrently. The problem seems to 
appear if one sets the {{GlobalJobParameters}}. The stack trace can be found 
below:

{code}
Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
java.io.OptionalDataException
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
        at java.util.HashMap.readObject(HashMap.java:1407)
        at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
        at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
        at
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
        at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

The user code causing the problem is:

{code}
@SuppressWarnings("serial")
public class UnionThreaded {

    static int ThreadPoolSize = 3;
    static int JobsPerThread = 2;
    static ParameterTool params;

    public static class RunSubset implements Runnable {

        private int start = 0;
        private int end = 0;


        RunSubset(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public void run() {
            // set up the execution environment
            final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

            // make parameters available in the web interface
            env.getConfig().setGlobalJobParameters(params);
            if (params.has("left") && params.has("right")) {
                for (int i = start; i < end; i++) {
                    DataSet<DeviceRecord> l, r;
                    DataSet<DeviceRecord1> j;
                    DataSet<Tuple2<Integer, Integer>> c1, c2;
                    r = env.readCsvFile(params.get("right") + "/" + 
Integer.toString(i))
                        .pojoType(DeviceRecord.class, "A", "B", "C")
                        .setParallelism(1)
                        .filter(new MyFilter())
                        .setParallelism(1);
                    // read the text file from given input path
                    j = env.readCsvFile(params.get("left") + "/" + 
Integer.toString(i))
                        .pojoType(DeviceRecord.class, "A", "B", "C")
                        .setParallelism(1)
                        .leftOuterJoin(r)
                        .where("B")
                        .equalTo("B")
                        .with(new MyFlatJoinFunction()).setParallelism(1);
                    j.flatMap(new Mapper(false))
                        .groupBy(0)
                        .sum(1).setParallelism(1)
                        .writeAsCsv(params.get("output") + "/" + 
Integer.toString(i), "\n", ",");
                    j.flatMap(new Mapper2(true))
                        .groupBy(0)
                        .sum(1).setParallelism(1)
                        .writeAsCsv(params.get("output2") + "/" + 
Integer.toString(i), "\n", ",");
                }
            }
            try {
                System.out.println("calling env.execute()"); // + 
Calendar.getInstance().getTime();

                env.execute("Union4a" + ":" + Integer.toString(start) + ":" + 
Integer.toString(end));
            } catch (Exception e) {
                System.err.println("env.execute exception: " + e.getMessage());
            }

        }

    }

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************
    public static void main(String[] args) throws Exception {

        params = ParameterTool.fromArgs(args);
        // set up the execution environment
        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        int total_to_do = Integer.decode(params.get("filecount"));
        // number of threads should be <= number of slots
        ThreadPoolExecutor executor = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(ThreadPoolSize);
        // assumes an even number of jobs
        for (int i = 0; i < total_to_do; i += JobsPerThread) {
            int end = i + JobsPerThread;
            if (end > total_to_do) {
                end = total_to_do;
            }
            executor.execute(new RunSubset(i, end));
        }
        executor.shutdown();
        // Many ways of waiting.
        try {
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            System.out.println("Execution interrupted");
            System.exit(-1);
        }

        // get input data
        DataSet<Tuple2<Integer, Integer>> counts;
        DataSet<Tuple2<Integer, Integer>> counts2;

        counts = env.readCsvFile(params.get("output"))
            .types(Integer.class, Integer.class);
        counts2 = env.readCsvFile(params.get("output2"))
            .types(Integer.class, Integer.class);
        // Count by C
        counts = counts
            .groupBy(0)
            .sum(1);
        // Count by device
        counts2 = counts2
            .groupBy(0)
            .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output3"), "\n", ", ");
        }
        // emit result
        if (params.has("output2")) {
            counts2.writeAsCsv(params.get("output4"), "\n", ", ");
        }
        // execute program
        env.execute("Union4b");

    }
{code}

[1] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Execute-multiple-jobs-in-parallel-threading-java-io-OptionalDataException-td16441.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to