[ 
https://issues.apache.org/jira/browse/FLINK-7943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16234191#comment-16234191
 ] 

ASF GitHub Bot commented on FLINK-7943:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4921#discussion_r148286896
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/utils/RequiredParameters.java
 ---
    @@ -83,82 +82,100 @@ public void add(Option option) throws 
RequiredParametersException {
         * <p>If any check fails, a RequiredParametersException is thrown
         *
         * @param parameterTool - parameters supplied by the user.
    +    * @return New ParameterTool instance with default values set
         * @throws RequiredParametersException if any of the specified checks 
fail
         */
    -   public void applyTo(ParameterTool parameterTool) throws 
RequiredParametersException {
    +   public ParameterTool applyTo(ParameterTool parameterTool) throws 
RequiredParametersException {
                List<String> missingArguments = new LinkedList<>();
    +
    +           ParameterTool resultParameterTool = parameterTool;
                for (Option o : data.values()) {
    -                   if (parameterTool.data.containsKey(o.getName())) {
    -                           if 
(Objects.equals(parameterTool.data.get(o.getName()), 
ParameterTool.NO_VALUE_KEY)) {
    +                   if (resultParameterTool.has(o.getName())) {
    +                           if 
(Objects.equals(resultParameterTool.get(o.getName()), 
ParameterTool.NO_VALUE_KEY)) {
                                        // the parameter has been passed, but 
no value, check if there is a default value
    -                                   checkAndApplyDefaultValue(o, 
parameterTool.data);
    +                                   resultParameterTool = 
checkAndApplyDefaultValue(o, resultParameterTool);
                                } else {
                                        // a value has been passed in the 
parameterTool, now check if it adheres to all constraints
    -                                   checkAmbiguousValues(o, 
parameterTool.data);
    -                                   checkIsCastableToDefinedType(o, 
parameterTool.data);
    -                                   checkChoices(o, parameterTool.data);
    +                                   checkAmbiguousValues(o, 
resultParameterTool);
    +                                   checkIsCastableToDefinedType(o, 
resultParameterTool);
    +                                   checkChoices(o, resultParameterTool);
                                }
                        } else {
                                // check if there is a default name or a value 
passed for a possibly defined alternative name.
    -                           if 
(hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, parameterTool.data)) {
    +                           resultParameterTool = 
synchronizeAlternativeName(o, resultParameterTool);
    +
    +                           if (!resultParameterTool.has(o.getName()) || 
Objects.equals(resultParameterTool.get(o.getName()), 
ParameterTool.NO_VALUE_KEY)) {
                                        missingArguments.add(o.getName());
                                }
                        }
                }
                if (!missingArguments.isEmpty()) {
                        throw new 
RequiredParametersException(this.missingArgumentsText(missingArguments), 
missingArguments);
                }
    +
    +           return resultParameterTool;
        }
     
        // check if the given parameter has a default value and add it to the 
passed map if that is the case
        // else throw an exception
    -   private void checkAndApplyDefaultValue(Option o, Map<String, String> 
data) throws RequiredParametersException {
    -           if (hasNoDefaultValueAndNoValuePassedOnAlternativeName(o, 
data)) {
    +   private ParameterTool checkAndApplyDefaultValue(Option o, ParameterTool 
parameterTool) throws RequiredParametersException {
    +           final ParameterTool resultParameterTool = 
synchronizeAlternativeName(o, parameterTool);
    +
    +           if (!resultParameterTool.has(o.getName()) || 
Objects.equals(resultParameterTool.get(o.getName()), 
ParameterTool.NO_VALUE_KEY)) {
                        throw new RequiredParametersException("No default value 
for undefined parameter " + o.getName());
    +           } else {
    +                   return resultParameterTool;
                }
        }
     
        // check if the value in the given map which corresponds to the name of 
the given option
        // is castable to the type of the option (if any is defined)
    -   private void checkIsCastableToDefinedType(Option o, Map<String, String> 
data) throws RequiredParametersException {
    -           if (o.hasType() && 
!o.isCastableToDefinedType(data.get(o.getName()))) {
    +   private void checkIsCastableToDefinedType(Option o, ParameterTool 
parameterTool) throws RequiredParametersException {
    +           if (o.hasType() && 
!o.isCastableToDefinedType(parameterTool.get(o.getName()))) {
                        throw new RequiredParametersException("Value for 
parameter " + o.getName() +
                                        " cannot be cast to type " + 
o.getType());
                }
        }
     
        // check if the value in the given map which corresponds to the name of 
the given option
        // adheres to the list of given choices for the param in the options 
(if any are defined)
    -   private void checkChoices(Option o, Map<String, String> data) throws 
RequiredParametersException {
    -           if (o.getChoices().size() > 0 && 
!o.getChoices().contains(data.get(o.getName()))) {
    -                   throw new RequiredParametersException("Value " + 
data.get(o.getName()) +
    +   private void checkChoices(Option o, ParameterTool parameterTool) throws 
RequiredParametersException {
    +           if (o.getChoices().size() > 0 && 
!o.getChoices().contains(parameterTool.get(o.getName()))) {
    +                   throw new RequiredParametersException("Value " + 
parameterTool.get(o.getName()) +
                                        " is not in the list of valid choices 
for key " + o.getName());
                }
        }
     
        // move value passed on alternative name to standard name or apply 
default value if any defined
    -   // else return true to indicate parameter is 'really' missing
    -   private boolean 
hasNoDefaultValueAndNoValuePassedOnAlternativeName(Option o, Map<String, 
String> data)
    -                   throws RequiredParametersException {
    -           if (o.hasAlt() && data.containsKey(o.getAlt())) {
    -                   data.put(o.getName(), data.get(o.getAlt()));
    +   // if any change was applied, then this method returns a new 
ParameterTool instance with these
    +   // changes. If not, then the passed ParameterTool instance will be 
returned.
    +   private ParameterTool synchronizeAlternativeName(Option o, 
ParameterTool parameterTool) {
    +           // TODO: Throw this all away!!!
    +           if (o.hasAlt() && parameterTool.has(o.getAlt())) {
    +                   HashMap<String, String> newData = new 
HashMap<>(parameterTool.toMap());
    +                   newData.put(o.getName(), parameterTool.get(o.getAlt()));
    +
    +                   return ParameterTool.fromMap(newData);
                } else {
                        if (o.hasDefaultValue()) {
    -                           data.put(o.getName(), o.getDefaultValue());
    +                           HashMap<String, String> newData = new 
HashMap<>(parameterTool.toMap());
    --- End diff --
    
    could we not instead create a copy at the start of applyTo and modify that?


> OptionalDataException when launching Flink jobs concurrently
> ------------------------------------------------------------
>
>                 Key: FLINK-7943
>                 URL: https://issues.apache.org/jira/browse/FLINK-7943
>             Project: Flink
>          Issue Type: Bug
>          Components: Client
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>
> A user reported that he is getting a {{OptionalDataException}} when he 
> launches multiple Flink jobs from the same program concurrently. The problem 
> seems to appear if one sets the {{GlobalJobParameters}}. The stack trace can 
> be found below:
> {code}
> Failed to submit job 60f4da5cf76836fe52ceba5cebdae412 (Union4a:14:15)
> java.io.OptionalDataException
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1588)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
>         at java.util.HashMap.readObject(HashMap.java:1407)
>         at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2173)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
>         at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
>         at
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1283)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>         at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}
> The user code causing the problem is:
> {code}
> @SuppressWarnings("serial")
> public class UnionThreaded {
>     static int ThreadPoolSize = 3;
>     static int JobsPerThread = 2;
>     static ParameterTool params;
>     public static class RunSubset implements Runnable {
>         private int start = 0;
>         private int end = 0;
>         RunSubset(int start, int end) {
>             this.start = start;
>             this.end = end;
>         }
>         @Override
>         public void run() {
>             // set up the execution environment
>             final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>             // make parameters available in the web interface
>             env.getConfig().setGlobalJobParameters(params);
>             if (params.has("left") && params.has("right")) {
>                 for (int i = start; i < end; i++) {
>                     DataSet<DeviceRecord> l, r;
>                     DataSet<DeviceRecord1> j;
>                     DataSet<Tuple2<Integer, Integer>> c1, c2;
>                     r = env.readCsvFile(params.get("right") + "/" + 
> Integer.toString(i))
>                         .pojoType(DeviceRecord.class, "A", "B", "C")
>                         .setParallelism(1)
>                         .filter(new MyFilter())
>                         .setParallelism(1);
>                     // read the text file from given input path
>                     j = env.readCsvFile(params.get("left") + "/" + 
> Integer.toString(i))
>                         .pojoType(DeviceRecord.class, "A", "B", "C")
>                         .setParallelism(1)
>                         .leftOuterJoin(r)
>                         .where("B")
>                         .equalTo("B")
>                         .with(new MyFlatJoinFunction()).setParallelism(1);
>                     j.flatMap(new Mapper(false))
>                         .groupBy(0)
>                         .sum(1).setParallelism(1)
>                         .writeAsCsv(params.get("output") + "/" + 
> Integer.toString(i), "\n", ",");
>                     j.flatMap(new Mapper2(true))
>                         .groupBy(0)
>                         .sum(1).setParallelism(1)
>                         .writeAsCsv(params.get("output2") + "/" + 
> Integer.toString(i), "\n", ",");
>                 }
>             }
>             try {
>                 System.out.println("calling env.execute()"); // + 
> Calendar.getInstance().getTime();
>                 env.execute("Union4a" + ":" + Integer.toString(start) + ":" + 
> Integer.toString(end));
>             } catch (Exception e) {
>                 System.err.println("env.execute exception: " + 
> e.getMessage());
>             }
>         }
>     }
>     // 
> *************************************************************************
>     //     PROGRAM
>     // 
> *************************************************************************
>     public static void main(String[] args) throws Exception {
>         params = ParameterTool.fromArgs(args);
>         // set up the execution environment
>         final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         // make parameters available in the web interface
>         env.getConfig().setGlobalJobParameters(params);
>         int total_to_do = Integer.decode(params.get("filecount"));
>         // number of threads should be <= number of slots
>         ThreadPoolExecutor executor = (ThreadPoolExecutor) 
> Executors.newFixedThreadPool(ThreadPoolSize);
>         // assumes an even number of jobs
>         for (int i = 0; i < total_to_do; i += JobsPerThread) {
>             int end = i + JobsPerThread;
>             if (end > total_to_do) {
>                 end = total_to_do;
>             }
>             executor.execute(new RunSubset(i, end));
>         }
>         executor.shutdown();
>         // Many ways of waiting.
>         try {
>             executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
>         } catch (InterruptedException e) {
>             System.out.println("Execution interrupted");
>             System.exit(-1);
>         }
>         // get input data
>         DataSet<Tuple2<Integer, Integer>> counts;
>         DataSet<Tuple2<Integer, Integer>> counts2;
>         counts = env.readCsvFile(params.get("output"))
>             .types(Integer.class, Integer.class);
>         counts2 = env.readCsvFile(params.get("output2"))
>             .types(Integer.class, Integer.class);
>         // Count by C
>         counts = counts
>             .groupBy(0)
>             .sum(1);
>         // Count by device
>         counts2 = counts2
>             .groupBy(0)
>             .sum(1);
>         // emit result
>         if (params.has("output")) {
>             counts.writeAsCsv(params.get("output3"), "\n", ", ");
>         }
>         // emit result
>         if (params.has("output2")) {
>             counts2.writeAsCsv(params.get("output4"), "\n", ", ");
>         }
>         // execute program
>         env.execute("Union4b");
>     }
> {code}
> [1] 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Execute-multiple-jobs-in-parallel-threading-java-io-OptionalDataException-td16441.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to