[ 
https://issues.apache.org/jira/browse/FLINK-24558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bai sui updated FLINK-24558:
----------------------------
    Labels: pull-request-available  (was: pull-request-available stale-assigned)

> dataStream can not use multiple classloaders 
> ---------------------------------------------
>
>                 Key: FLINK-24558
>                 URL: https://issues.apache.org/jira/browse/FLINK-24558
>             Project: Flink
>          Issue Type: Improvement
>          Components: Command Line Client
>            Reporter: bai sui
>            Assignee: bai sui
>            Priority: Minor
>              Labels: pull-request-available
>         Attachments: Flink ClassLoader优化 (1).png
>
>
> create a dataStream demo as below,in the demo,create a very simple example,
> read stream data from sourceFunction,and send it to sinkFunction without any 
> processing.
> The point is,by creating the instance of SourceFunction and SinkFunction has 
> used two separately URLClassLoader with different dependencies,for avoiding 
> the code conflict .
> but the problem is flink client send to server ,the server side throw an 
> classNotFoundException which defined the de classloader dependencies, 
> Obviously the server side has not use the classloader as client side.
> how can I solve the problem ,is there any one can give me some advice ? 
> thanks a lot 
>  
> {code:java}
> public class FlinkStreamDemo {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         SourceFunction<DTO> sourceFunc = createSourceFunction();
>         DataStreamSource<DTO> dtoDataStreamSource = env.addSource(sourceFunc);
>         SinkFunction<DTO> sinkFunction = createSink();
>         dtoDataStreamSource.addSink(sinkFunction);
>         env.execute("flink-example");
>     }
>     private static SinkFunction<DTO> createSink() {
>         URL[] urls = new URL[]{...};
>         ClassLoader classLoader = new URLClassLoader(urls);
>         ServiceLoader<ISinkFunctionFactory> loaders = 
> ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
>         Iterator<ISinkFunctionFactory> it = loaders.iterator();
>         if (it.hasNext()) {
>             return it.next().create();
>         }
>         throw new IllegalStateException();
>     }
>     private static SourceFunction<DTO> createSourceFunction() {
>         URL[] urls = new URL[]{...};
>         ClassLoader classLoader = new URLClassLoader(urls);
>         ServiceLoader<ISourceFunctionFactory> loaders = 
> ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
>         Iterator<ISourceFunctionFactory> it = loaders.iterator();
>         if (it.hasNext()) {
>             return it.next().create();
>         }
>         throw new IllegalStateException();
>     }
>     public interface ISinkFunctionFactory {
>         SinkFunction<DTO> create();
>     }
>     public interface ISourceFunctionFactory {
>         SourceFunction<DTO> create();
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to