[ 
https://issues.apache.org/jira/browse/FLINK-7943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224909#comment-16224909
 ] 

ASF GitHub Bot commented on FLINK-7943:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4921

    [FLINK-7943] Make ParameterTool thread safe

    ## What is the purpose of the change
    
    This commit changes the serialization of the ParameterTool such that only 
the
    data map is contained. The defaultData and the unrequestedParameters maps 
are
    not serialized because they are only used on the client side. Additionally, 
the
    defaultData and unrequestedParameters map are being made thread safe by 
using
    ConcurrentHashMaps.
    
    ## Verifying this change
    
    - Added `ParameterToolTest#testConcurrentExecutionConfigSerialization`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink copyParameterTool

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4921.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4921
    
----

----


> OptionalDataException when launching Flink jobs concurrently
> ------------------------------------------------------------
>
>                 Key: FLINK-7943
>                 URL: https://issues.apache.org/jira/browse/FLINK-7943
>             Project: Flink
>          Issue Type: Bug
>          Components: Client
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> A user reported that he is getting a {{OptionalDataException}} when he 
> launches multiple Flink jobs from the same program concurrently. The problem 
> seems to appear if one sets the {{GlobalJobParameters}}. The stack trace can 
> be found below:
> {code}
> Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
> java.io.OptionalDataException
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
>         at java.util.HashMap.readObject(HashMap.java:1407)
>         at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>         at
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>         at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> The user code causing the problem is:
> {code}
> @SuppressWarnings("serial")
> public class UnionThreaded {
>     static int ThreadPoolSize = 3;
>     static int JobsPerThread = 2;
>     static ParameterTool params;
>     public static class RunSubset implements Runnable {
>         private int start = 0;
>         private int end = 0;
>         RunSubset(int start, int end) {
>             this.start = start;
>             this.end = end;
>         }
>         @Override
>         public void run() {
>             // set up the execution environment
>             final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>             // make parameters available in the web interface
>             env.getConfig().setGlobalJobParameters(params);
>             if (params.has("left") && params.has("right")) {
>                 for (int i = start; i < end; i++) {
>                     DataSet<DeviceRecord> l, r;
>                     DataSet<DeviceRecord1> j;
>                     DataSet<Tuple2<Integer, Integer>> c1, c2;
>                     r = env.readCsvFile(params.get("right") + "/" + 
> Integer.toString(i))
>                         .pojoType(DeviceRecord.class, "A", "B", "C")
>                         .setParallelism(1)
>                         .filter(new MyFilter())
>                         .setParallelism(1);
>                     // read the text file from given input path
>                     j = env.readCsvFile(params.get("left") + "/" + 
> Integer.toString(i))
>                         .pojoType(DeviceRecord.class, "A", "B", "C")
>                         .setParallelism(1)
>                         .leftOuterJoin(r)
>                         .where("B")
>                         .equalTo("B")
>                         .with(new MyFlatJoinFunction()).setParallelism(1);
>                     j.flatMap(new Mapper(false))
>                         .groupBy(0)
>                         .sum(1).setParallelism(1)
>                         .writeAsCsv(params.get("output") + "/" + 
> Integer.toString(i), "\n", ",");
>                     j.flatMap(new Mapper2(true))
>                         .groupBy(0)
>                         .sum(1).setParallelism(1)
>                         .writeAsCsv(params.get("output2") + "/" + 
> Integer.toString(i), "\n", ",");
>                 }
>             }
>             try {
>                 System.out.println("calling env.execute()"); // + 
> Calendar.getInstance().getTime();
>                 env.execute("Union4a" + ":" + Integer.toString(start) + ":" + 
> Integer.toString(end));
>             } catch (Exception e) {
>                 System.err.println("env.execute exception: " + 
> e.getMessage());
>             }
>         }
>     }
>     // 
> *************************************************************************
>     //     PROGRAM
>     // 
> *************************************************************************
>     public static void main(String[] args) throws Exception {
>         params = ParameterTool.fromArgs(args);
>         // set up the execution environment
>         final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         // make parameters available in the web interface
>         env.getConfig().setGlobalJobParameters(params);
>         int total_to_do = Integer.decode(params.get("filecount"));
>         // number of threads should be <= number of slots
>         ThreadPoolExecutor executor = (ThreadPoolExecutor) 
> Executors.newFixedThreadPool(ThreadPoolSize);
>         // assumes an even number of jobs
>         for (int i = 0; i < total_to_do; i += JobsPerThread) {
>             int end = i + JobsPerThread;
>             if (end > total_to_do) {
>                 end = total_to_do;
>             }
>             executor.execute(new RunSubset(i, end));
>         }
>         executor.shutdown();
>         // Many ways of waiting.
>         try {
>             executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
>         } catch (InterruptedException e) {
>             System.out.println("Execution interrupted");
>             System.exit(-1);
>         }
>         // get input data
>         DataSet<Tuple2<Integer, Integer>> counts;
>         DataSet<Tuple2<Integer, Integer>> counts2;
>         counts = env.readCsvFile(params.get("output"))
>             .types(Integer.class, Integer.class);
>         counts2 = env.readCsvFile(params.get("output2"))
>             .types(Integer.class, Integer.class);
>         // Count by C
>         counts = counts
>             .groupBy(0)
>             .sum(1);
>         // Count by device
>         counts2 = counts2
>             .groupBy(0)
>             .sum(1);
>         // emit result
>         if (params.has("output")) {
>             counts.writeAsCsv(params.get("output3"), "\n", ", ");
>         }
>         // emit result
>         if (params.has("output2")) {
>             counts2.writeAsCsv(params.get("output4"), "\n", ", ");
>         }
>         // execute program
>         env.execute("Union4b");
>     }
> {code}
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Execute-multiple-jobs-in-parallel-threading-java-io-OptionalDataException-td16441.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to