A workaround may be to use the DistributedCache. It apparently is not 
documented much but the JavaDoc mentions roughly how to use it:

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
flink/api/java/ExecutionEnvironment.java#L954

/**
 * Registers a file at the distributed cache under the given name. The file 
will 
be accessible
 * from any user-defined function in the (distributed) runtime under a local 
path. Files
 * may be local files (as long as all relevant workers have access to it), or 
files in a distributed file system.
 * The runtime will copy the files temporarily to a local cache, if needed.
 * <p>
 * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be 
obtained inside UDFs via
 * {@link 
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and 
provides access
 * {@link org.apache.flink.api.common.cache.DistributedCache} via 
 * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
 * 
 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or 
"hdfs://host:port/and/path")
 * @param name The name under which the file is registered.
 */
public void registerCachedFile(String filePath, String name){
        registerCachedFile(filePath, name, false);
}

You could pass the actual file URL to use for each instance of your job that 
requires a different file via a simple job parameter:


public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);

        ...

        env.registerCachedFile(params.get("config_file", <default/path>), 
"extConfig");

        ...
}

Flink's DistributedCache will then cache the file locally and you can use it in 
a RichFunction like in
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
apache/flink/test/distributedCache/DistributedCacheTest.java#L99

public class MyFunction extends AbstractRichFunction {
        private static final long serialVersionUID = 1L;

        @Override
        public void open(Configuration conf) throws IOException {
                File file = 
getRuntimeContext().getDistributedCache().getFile("extConfig");
...
        }
}


Nico

On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
> Hi guys,
> 
> any news?
> I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
> <https://issues.apache.org/jira/browse/FLINK-6949>.
> 
> 
> Kind Regards,
> Mike Pryakhin
> 
> > On 16 Jun 2017, at 16:35, Mikhail Pryakhin <m.prya...@gmail.com> wrote:
> > 
> > Hi all,
> > 
> > I run my flink job on yarn cluster and need to supply job configuration
> > parameters via configuration file alongside with the job jar.
> > (configuration file can't be packaged into jobs jar file). I tried to put
> > the configuration file into the folder that is passed via --yarnship
> > option to the flink run command, then this file is copied to the yarn
> > cluster and added to JVM class path like 'path/application.conf' but is
> > ignored by TM JVM as it is neither jar(zip) file nor directory...
> > 
> > A looked through the YarnClusterDescriptor class where the
> > ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
> > flink (YarnClusterDescriptor especially) to add my configuration file to
> > the TM JVM classpath... Is there any way to do so? If not do you consider
> > to have such an ability to add files? (like in spark I just can pass any
> > files via --files option)
> > 
> > Thanks in advance.
> > 
> > Kind Regards,
> > Mike Pryakhin

Attachment: signature.asc
Description: This is a digitally signed message part.

Reply via email to