godfrey he created FLINK-23094: ---------------------------------- Summary: encounter thread-safe problem when using StreamExecutionEnvironment#initializeContextEnvironment in multiple-threads environment Key: FLINK-23094 URL: https://issues.apache.org/jira/browse/FLINK-23094 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.13.0, 1.12.0, 1.11.0, 1.14.0 Reporter: godfrey he
encounter thread-safe problem when using StreamExecutionEnvironment#initializeContextEnvironment (or related code, such as PackagedProgramUtils#getPipelineFromProgram) in multiple-threads environment. The reason is the {{initializeContextEnvironment}} method is not thread-safe: {code:java} protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = ctx; threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory); } {code} {{contextEnvironmentFactory}} is a static variable, and when {{initializeContextEnvironment}} is executed in multiple-threads environment, the value of {{contextEnvironmentFactory}} may be changed by other thread when executing {{ threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);}} The solution is: use the local variable {{ctx}} instead of the static variable {{contextEnvironmentFactory}}. {code:java} protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = ctx; threadLocalContextEnvironmentFactory.set(ctx); } {code} Another thing I doubt is whether {{contextEnvironmentFactory}} is really needed ? Currently, {{contextEnvironmentFactory}} is not set or reset individually, it is always changed with {{threadLocalContextEnvironmentFactory}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)