godfrey he created FLINK-23094:
----------------------------------

             Summary: encounter thread-safe problem when using 
StreamExecutionEnvironment#initializeContextEnvironment in multiple-threads 
environment
                 Key: FLINK-23094
                 URL: https://issues.apache.org/jira/browse/FLINK-23094
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.13.0, 1.12.0, 1.11.0, 1.14.0
            Reporter: godfrey he


encounter thread-safe problem when using 
StreamExecutionEnvironment#initializeContextEnvironment (or related code, such 
as PackagedProgramUtils#getPipelineFromProgram) in multiple-threads environment.

The reason is the {{initializeContextEnvironment}} method is not thread-safe:
{code:java}
    protected static void 
initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
        contextEnvironmentFactory = ctx;
        threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
    }
{code}

{{contextEnvironmentFactory}} is a static variable, and when 
{{initializeContextEnvironment}} is executed in multiple-threads environment, 
the value of {{contextEnvironmentFactory}} may be changed by other thread when 
executing {{ 
threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);}}

The solution is: use the local variable {{ctx}} instead of the static variable 
{{contextEnvironmentFactory}}.
 
{code:java}
    protected static void 
initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) {
        contextEnvironmentFactory = ctx;
        threadLocalContextEnvironmentFactory.set(ctx);
    }
{code}

Another thing I doubt is whether {{contextEnvironmentFactory}} is really needed 
? Currently, {{contextEnvironmentFactory}} is not set or reset individually, it 
is always changed with {{threadLocalContextEnvironmentFactory}}.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to