Nagy Attila Bálint created FLINK-39802:
------------------------------------------
Summary: Fix flaky ProfilingServiceTest.testRollingDeletion by
isolating the test and applying Interface Segregation to ProfilingService
Key: FLINK-39802
URL: https://issues.apache.org/jira/browse/FLINK-39802
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination, Runtime / REST
Affects Versions: 2.2.1
Reporter: Nagy Attila Bálint
h2. Summary & Problem Statement
The {{ProfilingServiceTest.testRollingDeletion}} [test
case|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L111]
fails intermittently (or consistently when run in a specific order) due to
shared static state leakage across a single JVM.
Currently, {{ProfilingService}} is
[implemented|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L58]
as a classic Singleton. The {{ProfilingService#getInstance(config)}}
[method|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L81]
only initializes the instance once.
If it has already been invoked by a previous test class, it will return the
existing static singleton instance and *completely ignore* the new
{{Configuration}} parameter passed to it.
Furthermore, {{ProfilingService}} implements the {{java.io.Closeable}}
interface (which extends {{AutoCloseable}}), exposing a public {{close()}}
method. This method is implemented as
[follows|http://https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L163]:
{code:java}
@Override
public void close() throws IOException {
try {
if (profilingFuture != null && !profilingFuture.isDone()) {
profilingFuture.cancel();
}
if (!scheduledExecutor.isShutdown()) {
scheduledExecutor.shutdownNow();
}
} catch (Exception e) {
LOG.error("Exception thrown during stopping profiling service. ", e);
} finally {
instance = null; // <--- Anti-pattern
}
}
{code}
Setting a shared static member to {{null}} inside an instance lifecycle method
is a *severe anti-pattern*.
It allows any single consumer to unexpectedly
[destroy|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L169]
the {{ScheduledExecutorService}} underneath all other concurrent users of the
singleton.
Moreover, calling {{getInstance()}} after a {{close()}} call will silently
spawn a brand new instance, potentially with completely different configuration
values.
Fortunately, {{ProfilingService::close()}} is *never called in production code*
(neither explicitly nor implicitly via *try-with-resources*).
In the test codebase, it is only ever utilized
[within|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L46]
{{ProfilingServiceTest}}.
h2. Root Cause Analysis of the Test Failure
The failure of {{ProfilingServiceTest#testRollingDeletion}} highlights the
danger of this shared JVM state.
When multiple tests run sequentially or concurrently within the same fork,
preceding test classes initialize the {{ProfilingService}} singleton with
default values, which defaults
[the|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L73]
{{PROFILING_RESULT_DIR}} to {{/tmp}}.
When {{ProfilingServiceTest#setUp}}
[runs|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L57],
its configuration initialization (intended to guide the profiler to a clean,
isolated {{@TempDir}}) has *zero effect*, because the old singleton instance
pointing to {{/tmp}} is returned.
h3. Consequences on the File System:
Instead of executing inside the isolated {{/tmp/junitXXXXX}} temporary
directories, the test writes its {{.html}} output files prefixed
[with|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L49]
{{"TestJobManager"}} directly into the global {{/tmp}} directory.
When {{ProfilingServiceTest#verifyRollingDeletionWorks()}}
[executes|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L176]
{{new File(profilingService.getProfilingResultDir()).listFiles()}}, it reads
the root {{/tmp}} directory and mistakenly picks up stray {{.html}} files left
behind by entirely different or previous test runs.
This leads to a file count mismatch:
{noformat}
ProfilingServiceTest.testRollingDeletion:XXX->verifyRollingDeletionWorks:XXX
expected: <3> but was: <XX>
{noformat}
h2. How to Reproduce
The issue can be reliably reproduced even on a single thread (sequential
execution) by ensuring a test that initializes the default profiling singleton
runs right before the profiling service test:
{noformat}
./mvnw clean test -pl flink-runtime \
-Dfast -Djdk17 -Pjava17-target -fae \
-Dflink.forkCountUnitTest=1 \
-Dtest=ProfilingServiceTest,ShuffleMasterTest
{noformat}
*Important:* {{ShuffleMasterTest}} must execute before {{ProfilingServiceTest}}
to pollute the static instance and trigger the failure.
h3. List of Shared-State Polluting Tests (not complete list)
The following is a verified list of test classes that call
{{ProfilingService::getInstance}} and can pollute the environment with the
default /tmp directory:
{noformat}
org.apache.flink.runtime.shuffle.ShuffleMasterTest
org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventHandlingTest
org.apache.flink.runtime.taskexecutor.TaskManagerRunnerStartupTest
org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest
org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTest
org.apache.flink.runtime.taskexecutor.TaskExecutorSlotLifetimeTest
org.apache.flink.runtime.taskexecutor.TaskExecutorExecutionDeploymentReconciliationTest
org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest
org.apache.flink.runtime.taskexecutor.TaskExecutorRecoveryTest
org.apache.flink.runtime.taskexecutor.TaskExecutorTest
org.apache.flink.runtime.io.network.partition.PartialConsumePipelinedResultTest
org.apache.flink.runtime.leaderelection.LeaderChangeClusterComponentsTest
org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest
{noformat}
h2. Proposed Changes
h3. Part 1: Test-Level Isolation & Reset
To fix the immediate test failure, we must isolate {{ProfilingServiceTest}} and
force a clean state reset before each test execution:
1.) Add the JUnit 5 {{@Isolated}} annotation to the {{ProfilingServiceTest}}
class to guarantee it runs in its own thread quarantine.
2.) In the {{@BeforeEach setUp()}} method, explicitly call
{{ProfilingService.getInstance(configs).close();}} to wipe out any polluted
singleton instance inherited from preceding tests, allowing the custom
{{@TempDir}} config to finally take effect.
*Note:* Without the {{@Isolated}} annotation, calling {{close()}} inside
{{setUp()}} would have disastrous, unpredictable consequences on other
concurrently running tests that depend on the shared singleton.
The exact same risk applies to implicit {{close()}} calls if a developer
mistakenly uses the service inside a {{try-with-resources}} block.
h3. Part 2: Long-Term Architectural Fix via Interface Segregation (*proposal*)
To prevent developers from accidentally invoking {{close()}} on the singleton
instance in production or through unmanaged {{try-with-resources}} blocks, we
should implement *Interface Segregation*.
1.) Convert {{ProfilingService}} into an interface containing all original
public functional methods, excluding {{close()}}:
{code:java}
public interface ProfilingService {
CompletableFuture<ProfilingInfo> requestProfiling(
String resourceID, long duration, ProfilingInfo.ProfilingMode mode);
CompletableFuture<Collection<ProfilingInfo>> getProfilingList(String
resourceID);
String getProfilingResultDir();
}
{code}
2.) Move the actual implementation details to a new concrete class named e.g.
{{ProfilingServiceSingleton}} which implements both our interface and
{{Closeable}}:
{code:java}
public class ProfilingServiceSingleton implements ProfilingService, Closeable {
// Original implementation stays here...
public static ProfilingService getInstance(Configuration configs) {
if (instance == null) {
synchronized (ProfilingServiceSingleton.class) {
if (instance == null) {
instance = new ProfilingServiceSingleton(configs);
}
}
}
return instance;
}
}
{code}
By returning the {{ProfilingService}} interface type from {{getInstance()}},
the public {{close()}} method is cleanly hidden from production consumers.
This requires changing the class name at exactly *3 production call sites*
where {{.getInstance(...)}} is invoked:
{noformat}
org.apache.flink.runtime.rest.handler.cluster.JobManagerProfilingHandler
org.apache.flink.runtime.rest.handler.cluster.JobManagerProfilingListHandler
org.apache.flink.runtime.taskexecutor.TaskExecutor
{noformat}
They will seamlessly compile using the interface:
{code:java}
// Keeps production code safe from accidental lifecycle termination
// this.profilingService = ProfilingService.getInstance(configuration);
<-- old class name
this.profilingService =
ProfilingServiceSingleton.getInstance(configuration); <-- new class name
{code}
Since {{ProfilingService::close}} is currently not called anywhere in
production, this change is perfectly backwards-compatible, eliminates a
dangerous anti-pattern, and completely eradicates the flakiness of the
architectural unit tests.
h2. Testing & Verification
To verify the proposed fix and explore the exact conditions under which this
shared-state flakiness manifests, I executed a series of stress tests using a
*single Maven fork* ({{-Dflink.forkCountUnitTest=1}}).
This forces all targeted test classes into a single, shared JVM environment
where the static singleton instance issue can be actively reproduced and
monitored.
h3. Preparation Step (POM Modification)
Before running the commands, I commented out the hardcoded
{{<junit.jupiter.execution.parallel.*>}} system property variables under the
maven-surefire-plugin configuration block in the root {{pom.xml}}.
This was necessary to cleanly pass customized parallel execution flags
externally using the {{-Dsurefire.module.config}} property.
h3. Test Scenario 1: Single JVM, Concurrent-Class Sequential Execution (1
Thread)
In this scenario, I configured JUnit 5 to enable parallel class execution but
throttled the underlying fixed thread pool down to exactly 1 thread
({{max-pool-size=1}}, {{parallelism=1}}).
This forces the classes to run sequentially on a single thread while using the
concurrent class scheduling mechanism.
*Execution Command:*
{noformat}
./mvnw test -pl flink-runtime \
-Dfast -Djdk17 -Pjava17-target -fae \
-Dgit.commit.id.skip=true -Dmaven.gitcommitid.skip=true \
-Dflink.forkCountUnitTest=1 \
-Dsurefire.module.config="--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
-Djunit.jupiter.execution.parallel.enabled=true
-Djunit.jupiter.execution.parallel.mode.classes.default=concurrent
-Djunit.jupiter.execution.parallel.mode.default=same_thread
-Djunit.jupiter.execution.parallel.config.strategy=fixed
-Djunit.jupiter.execution.parallel.config.fixed.max-pool-size=1
-Djunit.jupiter.execution.parallel.config.fixed.saturate=true
-Djunit.jupiter.execution.parallel.config.fixed.parallelism=1" \
-Dtest=ProfilingServiceTest,ShuffleMasterTest,TaskExecutorOperatorEventHandlingTest,TaskManagerRunnerStartupTest,TaskExecutorPartitionLifecycleTest,TaskManagerRunnerTest,TaskExecutorSlotLifetimeTest,TaskExecutorExecutionDeploymentReconciliationTest,TaskExecutorSubmissionTest,TaskExecutorRecoveryTest,TaskExecutorTest,PartialConsumePipelinedResultTest,LeaderChangeClusterComponentsTest,SlotCountExceedingParallelismTest
{noformat}
*Observation (Before Fix):*
As expected, the tests executing ahead of {{ProfilingServiceTest}} pollute the
JVM-wide static singleton.
When {{ProfilingServiceTest}} eventually executes, its setup configuration is
ignored, and the test fails on the rolling deletion assertion as the path
defaults back to {{/tmp}}.
h3. Test Scenario 2: Single JVM, High Parallelism Stress Test (200 Threads)
Next, I ramped up the configuration to test heavy concurrent execution within
the same JVM by increasing both {{fixed.max-pool-size}} and
{{fixed.parallelism}} to *200*.
*Observation (Before Fix - Severe Flakiness Masking):*
Under heavy multi-threaded contention, I observed a highly dangerous side
effect of the {{close()}} anti-pattern.
Out of the *5* test cases defined inside {{ProfilingServiceTest}}, *only 1
single test case actually executed*, while the remaining 4 were silently
dropped or swallowed. To make matters worse, Maven marked the entire run as a
success (Green Build):
{noformat}
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 27.55 s
-- in org.apache.flink.runtime.util.profiler.ProfilingServiceTest
{noformat}
This proves that without strict isolation and proper interface boundaries, the
current {{ProfilingService::close}} logic actively masks test execution results
during concurrent test execution, reporting successful builds despite dropped
test coverage.
h2. Verification Results After Patch
After applying the proposed changes (*Interface Segregation* via
{{ProfilingServiceSingleton}} + {{@Isolated}} on {{ProfilingServiceTest}} +
*Explicit* {{close()}} state reset in {{setUp}}):
*Scenario 1 (Single Thread / Shared JVM):* Pass. Every preceding test runs
successfully, and {{ProfilingServiceTest}} cleanly resets the state to work
inside its assigned {{@TempDir}}.
*Scenario 2 (200 Threads / High Contention):* Pass. All 5 test cases inside
ProfilingServiceTest are safely quarantined, execute successfully in their
entirety, and no longer swallow test executions or pollute the surrounding
environment.
h2. Previous JIRAs around this issue:
FLINK-38442: ProfilingServiceTest.testRollingDeletion is unstable (Resolved)
FLINK-35571: ProfilingServiceTest.testRollingDeletion intermittently fails due
to improper test isolation (Open)
FLINK-34013: ProfilingServiceTest.testRollingDeletion is unstable on AZP
(Resolved)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)