Hi ouywl,

*>>    Thread.currentThread().getContextClassLoader();*

What does this statement mean in your program?

In addition, can you share your implementation of the customized file
system plugin and the related exception?

Best,
Vino

ouywl <ou...@139.com> 于2019年12月18日周三 下午4:59写道:

> Hi all,
>     We have implemented a filesystem plugin for sink data to hdfs1, and
> the yarn for flink running is used hdfs2. So when the job running, the
> jobmanager use the conf of hdfs1 to create filesystem, the filesystem
> plugin  is conflict with flink component.
>     We implemeted step:
>       1.  ‘FileSystemEnhance’ is implement from “FileSystem”
>       2.  ‘FileSystemFactoryEnhance’ is implement from “FileSystemFactory”,add
> kerberos auth in ”FileSystemFactoryEnhance"
>       3. Add a service entry. Create a file
> META-INF/services/org.apache.flink.core.fs.FileSystemFactory which
> contains the class name of “ FileSystemFactoryEnhance.class”
>
> And  the job mainclass is :
>    “ *public static void main(String[] args) throws Exception{*
>
> *    StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();*
>
>
>
>
>
>
>
>
> *    env.enableCheckpointing(60*1000);    
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>     
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     env.getConfig().enableSysoutLogging();    Properties props = new 
> Properties();    props.put("bootstrap.servers", SERVERS);    
> props.put("group.id <http://group.id>", GROUPID);    
> props.put("enable.auto.commit", "true");    // 
> props.put("auto.commit.interval.ms <http://auto.commit.interval.ms>", 
> "1000");    props.put("session.timeout.ms <http://session.timeout.ms>", 
> "30000");    props.put("auto.offset.reset", "latest");    
> props.put("key.deserializer", 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StringDeserializer.class.getName());
>     props.put("value.deserializer", StringDeserializer.class.getName());    
> FlinkKafkaConsumer010 consumer011 = new 
> FlinkKafkaConsumer010<String>("zyf_test_2", new SimpleStringSchema(), props); 
>    DataStream<String> source = env.addSource(consumer011).setParallelism(1);  
>   source.print();    Thread.currentThread().getContextClassLoader();    
> StreamingFileSink sink = StreamingFileSink            .forRowFormat(new 
> Path("hdfs://bdms-test/user/sloth/zyf"), new SimpleStringEncoder<>("UTF-8"))  
>           .build();    source.addSink(sink);    env.execute();}”And start the 
> job, the jobmanager filesystem is error, the log means the jobmananger use 
> “FileSystemFactoryEnhance” filesystem and confict.As the url 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#pluggable-file-systems>
>  how to avoid use “Thread.currentThread().getContextClassLoader()"*
>
>
> ouywl
> ou...@139.com
>
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=ouywl&uid=ouywl%40139.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsma8dc7719018ba2517da7111b3db5a170.jpg&items=%5B%22ouywl%40139.com%22%5D>
>
>

Reply via email to