This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 32c657c1426 [FLINK-32592] Fix thread-safety of context (Stream)ExEnv initializer 32c657c1426 is described below commit 32c657c14266531d3c7b1f448d96f513661a59d6 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Fri Jul 14 19:51:20 2023 +0200 [FLINK-32592] Fix thread-safety of context (Stream)ExEnv initializer --- .../flink/api/java/ExecutionEnvironment.java | 2 +- .../flink/api/java/ExecutionEnvironmentTest.java | 71 ++++++++++++++++++++++ .../environment/StreamExecutionEnvironment.java | 2 +- .../StreamExecutionEnvironmentTest.java | 48 ++++++++++++++- 4 files changed, 119 insertions(+), 4 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index a7b0cabedfc..9ed24215d35 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -1422,7 +1422,7 @@ public class ExecutionEnvironment { */ protected static void initializeContextEnvironment(ExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = Preconditions.checkNotNull(ctx); - threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory); + threadLocalContextEnvironmentFactory.set(ctx); } /** diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java new file mode 100644 index 00000000000..e4c859e91ed --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutionEnvironmentTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java; + +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.OneShotLatch; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.assertj.core.api.Assertions.assertThat; + +class ExecutionEnvironmentTest { + + @Test + void testConcurrentSetContext() throws Exception { + int numThreads = 20; + final CountDownLatch waitingThreadCount = new CountDownLatch(numThreads); + final OneShotLatch latch = new OneShotLatch(); + final List<CheckedThread> threads = new ArrayList<>(); + for (int x = 0; x < numThreads; x++) { + final CheckedThread thread = + new CheckedThread() { + @Override + public void go() throws InterruptedException { + final ExecutionEnvironment preparedEnvironment = + new ExecutionEnvironment(); + ExecutionEnvironment.initializeContextEnvironment( + () -> preparedEnvironment); + try { + waitingThreadCount.countDown(); + latch.await(); + assertThat(ExecutionEnvironment.getExecutionEnvironment()) + .isSameAs(preparedEnvironment); + } finally { + ExecutionEnvironment.resetContextEnvironment(); + } + } + }; + thread.start(); + threads.add(thread); + } + + // wait for all threads to be ready and trigger the job submissions at the same time + waitingThreadCount.await(); + latch.trigger(); + + for (CheckedThread thread : threads) { + thread.sync(); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 955afff19a6..e941386b66d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -2578,7 +2578,7 @@ public class StreamExecutionEnvironment implements AutoCloseable { protected static void initializeContextEnvironment(StreamExecutionEnvironmentFactory ctx) { contextEnvironmentFactory = ctx; - threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory); + threadLocalContextEnvironmentFactory.set(ctx); } protected static void resetContextEnvironment() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java similarity index 91% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java index 0d8836d8482..39293c19b42 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api; +package org.apache.flink.streaming.api.environment; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -28,11 +28,12 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -45,13 +46,16 @@ import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.CountDownLatch; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -453,6 +457,46 @@ public class StreamExecutionEnvironmentTest { } } + @Test + public void testConcurrentSetContext() throws Exception { + int numThreads = 20; + final CountDownLatch waitingThreadCount = new CountDownLatch(numThreads); + final OneShotLatch latch = new OneShotLatch(); + final List<CheckedThread> threads = new ArrayList<>(); + for (int x = 0; x < numThreads; x++) { + final CheckedThread thread = + new CheckedThread() { + @Override + public void go() throws InterruptedException { + final StreamExecutionEnvironment preparedEnvironment = + new StreamExecutionEnvironment(); + StreamExecutionEnvironment.initializeContextEnvironment( + configuration -> preparedEnvironment); + try { + waitingThreadCount.countDown(); + latch.await(); + Assertions.assertThat( + StreamExecutionEnvironment + .getExecutionEnvironment()) + .isSameAs(preparedEnvironment); + } finally { + StreamExecutionEnvironment.resetContextEnvironment(); + } + } + }; + thread.start(); + threads.add(thread); + } + + // wait for all threads to be ready and trigger the job submissions at the same time + waitingThreadCount.await(); + latch.trigger(); + + for (CheckedThread thread : threads) { + thread.sync(); + } + } + ///////////////////////////////////////////////////////////// // Utilities /////////////////////////////////////////////////////////////