I would discourage from using the GlobalJobParameters. I think their
main purpose was to display configuration keys on the web interface.

Instead, I would simply do it as a class field which you set in the constructor.

public static final class Tokenizer extends
RichFlatMapFunction<String, Tuple2<String, Integer>> {

    private final String dbConnect;

    public Tokenizer(String dbConnect) {
        this.dbConnect = Preconditions.checkNotNull(dbConnect, "Input");
    }

    <...>
}

This way you also check the arguments before submitting the job and
not when the flat map is being executed.

– Ufuk


On Sun, Jul 17, 2016 at 7:16 PM, Chesnay Schepler <ches...@apache.org> wrote:
> Hello Chen,
>
> you can access the set configuration in your rich function like this:
>
> public static final class Tokenizer extends RichFlatMapFunction<String,
> Tuple2<String, Integer>> {
>       @Override
>       public void flatMap(String value, Collector<Tuple2<String, Integer>> 
> out) {
>               ParameterTool parameters = (ParameterTool)
> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>               parameters.getRequired("input");
>               // .. do more ...
>
> Regards,
> Chesnay
>
>
> On 17.07.2016 18:22, Chen Bekor wrote:
>
> Hi,
>
> I Need some assistance -
>
> I’m trying to globally register arguments from my main function for further
> extraction on stream processing nodes. My code base is Scala:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> val parameterTool = ParameterTool.fromArgs(args)
>
> env.getConfig.setGlobalJobParameters(parameterTool)
>
> but when I'm trying to retrieve them I get null pointer exception.
>
> private lazy val parameters: GlobalJobParameters =
> ExecutionEnvironment.getExecutionEnvironment.getConfig.getGlobalJobParameters
>
> Have read this article
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/best_practices.html
> and I’m curious if it is possible.
>
> This is required in order to read from a configuration file holding DB
> Connection String (Bootstrapping DB connections on each processing node on
> the bootstrapping phase.
>
> Regards,
> Chen.
>
>

Reply via email to