GitHub user rmetzger opened a pull request:

    https://github.com/apache/flink/pull/664

    [FLINK-1525][FEEDBACK] Introduction of a small input parameter parsing 
utility

    Hi,
    last week I was running a bunch of Flink Streaming jobs on a cluster. One 
of the jobs had 8 arguments which I changed in different iterations of the 
program.
    I ended up passing arguments like
    ```
    16 1 8 3 10k 
hdp22-w-1.c.internal:6667,hdp22-w-0.c.internal:6667,hdp22-m.c.internal:6667 
10000
    ```
    Its obvious that this is not easily maintainable.
    In addition to this experience, I got similar feedback from at least two 
other Flink users.
    
    Therefore, I sat down and implemented a simple class which allows users to 
work with input parameters in a hassle-free manner.
    The tool is called **ParameterUtil**. It can be initialized from:
    - regular command line arguments (`-` and `--`): 
`ParameterUtil.fromArgs(new String[]{"--berlin"});`
    - `.properties` files: `ParameterUtil.fromPropertiesFile(propertiesFile);`
    - system properties (-D arguments to the JVM): 
`ParameterUtil.fromSystemProperties()`;
    
    I'm also planning to provide an initializer which accepts the same 
arguments as Hadoop's GenericOptionsParser: 
https://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/util/GenericOptionsParser.html
 (our users are just too used to Hadoop's tooling)
    
    For accessing arguments, it has methods like:
    `parameter.getRequired("input")`, `parameter.get("output", 
"myDefaultValue")`, `parameter.getLong("expectedCount", -1L)` and so on ...
    
    Also, I added a method to export the parameters to Flink's `Configuration` 
class:
    ```
    Configuration config = parameter.getConfiguration();
    config.getLong("expectedCount", -1L)
    ```
    This allows users to pass the input arguments to operators in the APIs:
    ```
    text.flatMap(new Tokenizer()).withParameters(conf)
    ```
    
    The `ParameterUtil` itself is Serializable, so it can be passed into user 
functions (for example to the `Tokenizer`).
    Also, I extended the `ExecutionConfig` to allow passing a `UserConfig` with 
custom stuff inside it.
    
    The `ParameterUtil` is implementing the `UserConfig` interface, so users 
can do the following:
    
    ```java
    public static void main(String[] args) throws Exception {
      ParameterUtil pt = ParameterUtil.fromArgs(args);
      final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
      env.getConfig().setUserConfig(pt);
      ///.... regular flink stuff ....
    }
    ```
    Inside a (rich) user function, users can access the command line arguments:
    ```java
    text.flatMap(new Tokenizer()).flatMap(new 
RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
        @Override
        public void flatMap(Tuple2<String, Integer> value, 
Collector<Tuple2<String, Integer>> out) throws Exception {
                ExecutionConfig.UserConfig uc = 
getRuntimeContext().getExecutionConfig().getUserConfig();
                ParameterUtil pt = (ParameterUtil) uc;
                float norm = pt.getFloat("normalization", 0.15f);
        }
    })
    ```
    
    The `UserConfig` allows to export Key/Value pairs to the web interface. 
Running Wordcount:
    ```
    /bin/flink run ./examples/flink-java-examples-0.9-SNAPSHOT-WordCount.jar 
--input /home/robert/incubator-flink/build-target/README.txt --output /tmp/wo
    ```
    Will lead to the following result:
    
    
![paramutil](https://cloud.githubusercontent.com/assets/89049/7550566/14ea36c2-f667-11e4-9a81-ee6a017527b0.png)
    
    
    Before I'm now going to add this to all examples I would like to get some 
feedback for the API choices I made (I don't want to change all examples 
afterwards ;) ).
    Wordcount currently looks like this:
    ```java
    public static void main(String[] args) throws Exception {
        ParameterUtil pt = ParameterUtil.fromArgs(args);
        boolean fileOutput = pt.getNumberOfParameters() == 2;
        String textPath = null;
        String outputPath = null;
        if(fileOutput) {
                textPath = pt.getRequired("input");
                outputPath = pt.getRequired("output");
        }
        
        // set up the execution environment
        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setUserConfig(pt);
        
        // create initial DataSet, containing the text lines.
        DataSet<String> text;
        if(fileOutput) {
                text = env.readTextFile(textPath);
        } else {
                // get default test text data
                text = WordCountData.getDefaultTextLineDataSet(env);
        }
        
        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .groupBy(0)
                .sum(1);
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rmetzger/flink flink1525

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/664.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #664
    
----
commit 95a55eb7c0acabebf9ac0decfb669f3da0b514b1
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-07T20:49:33Z

    Initial draft

commit e801dcd5e6f43139f748d20468129126e409c4f4
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-07T20:51:27Z

    wip

commit 7c92b11c0c02faa25c645a5c74c00e83c73b7492
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-08T09:58:29Z

    wip

commit edf6aef4368d7589d87d515572334bbc9c4f7a99
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-08T14:29:58Z

    integrated into web frontend

commit 8bdc8f092bfa6de2e26c36d0dfb976a3e9d59c89
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-08T16:33:34Z

    wip

commit c339a5e98dd57fbb6753f9db10c15db3737e02c8
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-09T11:41:20Z

    travis, give me some feedback

commit dd1f5029fa0efb792bdab94f54bca3c61a9d0f32
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-09T11:45:57Z

    starting to rework the examples

commit 6f03b1ad054a6346996f3b148e09e0b3101588d7
Author: Robert Metzger <rmetz...@apache.org>
Date:   2015-05-09T13:53:05Z

    wip

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to