Nagy Attila Bálint created FLINK-39802:
------------------------------------------

             Summary: Fix flaky ProfilingServiceTest.testRollingDeletion by 
isolating the test and applying Interface Segregation to ProfilingService
                 Key: FLINK-39802
                 URL: https://issues.apache.org/jira/browse/FLINK-39802
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination, Runtime / REST
    Affects Versions: 2.2.1
            Reporter: Nagy Attila Bálint


h2. Summary & Problem Statement
The {{ProfilingServiceTest.testRollingDeletion}} [test 
case|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L111]
 fails intermittently (or consistently when run in a specific order) due to 
shared static state leakage across a single JVM.

Currently, {{ProfilingService}} is 
[implemented|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L58]
 as a classic Singleton. The {{ProfilingService#getInstance(config)}} 
[method|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L81]
 only initializes the instance once.
If it has already been invoked by a previous test class, it will return the 
existing static singleton instance and *completely ignore* the new 
{{Configuration}} parameter passed to it.

Furthermore, {{ProfilingService}} implements the {{java.io.Closeable}} 
interface (which extends {{AutoCloseable}}), exposing a public {{close()}} 
method. This method is implemented as 
[follows|http://https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L163]:

{code:java}
@Override
public void close() throws IOException {
    try {
        if (profilingFuture != null && !profilingFuture.isDone()) {
            profilingFuture.cancel();
        }
        if (!scheduledExecutor.isShutdown()) {
            scheduledExecutor.shutdownNow();
        }
    } catch (Exception e) {
        LOG.error("Exception thrown during stopping profiling service. ", e);
    } finally {
        instance = null; // <--- Anti-pattern
    }
}
{code}

Setting a shared static member to {{null}} inside an instance lifecycle method 
is a *severe anti-pattern*.
It allows any single consumer to unexpectedly 
[destroy|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L169]
 the {{ScheduledExecutorService}} underneath all other concurrent users of the 
singleton.
Moreover, calling {{getInstance()}} after a {{close()}} call will silently 
spawn a brand new instance, potentially with completely different configuration 
values.  

Fortunately, {{ProfilingService::close()}} is *never called in production code* 
(neither explicitly nor implicitly via *try-with-resources*).
In the test codebase, it is only ever utilized 
[within|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L46]
 {{ProfilingServiceTest}}.

h2. Root Cause Analysis of the Test Failure
The failure of {{ProfilingServiceTest#testRollingDeletion}} highlights the 
danger of this shared JVM state.

When multiple tests run sequentially or concurrently within the same fork, 
preceding test classes initialize the {{ProfilingService}} singleton with 
default values, which defaults 
[the|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java#L73]
 {{PROFILING_RESULT_DIR}} to {{/tmp}}.

When {{ProfilingServiceTest#setUp}} 
[runs|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L57],
 its configuration initialization (intended to guide the profiler to a clean, 
isolated {{@TempDir}}) has *zero effect*, because the old singleton instance 
pointing to {{/tmp}} is returned.

h3. Consequences on the File System:
Instead of executing inside the isolated {{/tmp/junitXXXXX}} temporary 
directories, the test writes its {{.html}} output files prefixed 
[with|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L49]
 {{"TestJobManager"}} directly into the global {{/tmp}} directory.

When {{ProfilingServiceTest#verifyRollingDeletionWorks()}} 
[executes|https://github.com/apache/flink/blob/59bb8b7a089b22665e06baee15c574541c402383/flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java#L176]
 {{new File(profilingService.getProfilingResultDir()).listFiles()}}, it reads 
the root {{/tmp}} directory and mistakenly picks up stray {{.html}} files left 
behind by entirely different or previous test runs.
This leads to a file count mismatch:
{noformat}
ProfilingServiceTest.testRollingDeletion:XXX->verifyRollingDeletionWorks:XXX 
expected: <3> but was: <XX>
{noformat}

h2. How to Reproduce
The issue can be reliably reproduced even on a single thread (sequential 
execution) by ensuring a test that initializes the default profiling singleton 
runs right before the profiling service test:
{noformat}
./mvnw clean test -pl flink-runtime \
  -Dfast -Djdk17 -Pjava17-target -fae \
  -Dflink.forkCountUnitTest=1 \
  -Dtest=ProfilingServiceTest,ShuffleMasterTest
{noformat}

*Important:* {{ShuffleMasterTest}} must execute before {{ProfilingServiceTest}} 
to pollute the static instance and trigger the failure.

h3. List of Shared-State Polluting Tests (not complete list)
The following is a verified list of test classes that call 
{{ProfilingService::getInstance}} and can pollute the environment with the 
default /tmp directory:
{noformat}
org.apache.flink.runtime.shuffle.ShuffleMasterTest
org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventHandlingTest
org.apache.flink.runtime.taskexecutor.TaskManagerRunnerStartupTest
org.apache.flink.runtime.taskexecutor.TaskExecutorPartitionLifecycleTest
org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTest
org.apache.flink.runtime.taskexecutor.TaskExecutorSlotLifetimeTest
org.apache.flink.runtime.taskexecutor.TaskExecutorExecutionDeploymentReconciliationTest
org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest
org.apache.flink.runtime.taskexecutor.TaskExecutorRecoveryTest
org.apache.flink.runtime.taskexecutor.TaskExecutorTest
org.apache.flink.runtime.io.network.partition.PartialConsumePipelinedResultTest
org.apache.flink.runtime.leaderelection.LeaderChangeClusterComponentsTest
org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest
{noformat}

h2. Proposed Changes
h3. Part 1: Test-Level Isolation & Reset
To fix the immediate test failure, we must isolate {{ProfilingServiceTest}} and 
force a clean state reset before each test execution:

1.) Add the JUnit 5 {{@Isolated}} annotation to the {{ProfilingServiceTest}} 
class to guarantee it runs in its own thread quarantine.

2.) In the {{@BeforeEach setUp()}} method, explicitly call 
{{ProfilingService.getInstance(configs).close();}} to wipe out any polluted 
singleton instance inherited from preceding tests, allowing the custom 
{{@TempDir}} config to finally take effect.

*Note:* Without the {{@Isolated}} annotation, calling {{close()}} inside 
{{setUp()}} would have disastrous, unpredictable consequences on other 
concurrently running tests that depend on the shared singleton.
The exact same risk applies to implicit {{close()}} calls if a developer 
mistakenly uses the service inside a {{try-with-resources}} block.

h3. Part 2: Long-Term Architectural Fix via Interface Segregation (*proposal*)
To prevent developers from accidentally invoking {{close()}} on the singleton 
instance in production or through unmanaged {{try-with-resources}} blocks, we 
should implement *Interface Segregation*.

1.) Convert {{ProfilingService}} into an interface containing all original 
public functional methods, excluding {{close()}}:
{code:java}
public interface ProfilingService {
    CompletableFuture<ProfilingInfo> requestProfiling(
            String resourceID, long duration, ProfilingInfo.ProfilingMode mode);

    CompletableFuture<Collection<ProfilingInfo>> getProfilingList(String 
resourceID);

    String getProfilingResultDir();
}
{code}

2.) Move the actual implementation details to a new concrete class named e.g. 
{{ProfilingServiceSingleton}} which implements both our interface and 
{{Closeable}}:
{code:java}
public class ProfilingServiceSingleton implements ProfilingService, Closeable {
    // Original implementation stays here...

    public static ProfilingService getInstance(Configuration configs) {
        if (instance == null) {
            synchronized (ProfilingServiceSingleton.class) {
                if (instance == null) {
                    instance = new ProfilingServiceSingleton(configs);
                }
            }
        }
        return instance;
    }
}
{code}


By returning the {{ProfilingService}} interface type from {{getInstance()}}, 
the public {{close()}} method is cleanly hidden from production consumers.

This requires changing the class name at exactly *3 production call sites* 
where {{.getInstance(...)}} is invoked:

{noformat}
org.apache.flink.runtime.rest.handler.cluster.JobManagerProfilingHandler
org.apache.flink.runtime.rest.handler.cluster.JobManagerProfilingListHandler
org.apache.flink.runtime.taskexecutor.TaskExecutor
{noformat}

They will seamlessly compile using the interface:
{code:java}
// Keeps production code safe from accidental lifecycle termination
// this.profilingService = ProfilingService.getInstance(configuration);         
  <-- old class name
   this.profilingService = 
ProfilingServiceSingleton.getInstance(configuration);     <-- new class name
{code}

Since {{ProfilingService::close}} is currently not called anywhere in 
production, this change is perfectly backwards-compatible, eliminates a 
dangerous anti-pattern, and completely eradicates the flakiness of the 
architectural unit tests.


h2. Testing & Verification
To verify the proposed fix and explore the exact conditions under which this 
shared-state flakiness manifests, I executed a series of stress tests using a 
*single Maven fork* ({{-Dflink.forkCountUnitTest=1}}).
This forces all targeted test classes into a single, shared JVM environment 
where the static singleton instance issue can be actively reproduced and 
monitored.

h3. Preparation Step (POM Modification)
Before running the commands, I commented out the hardcoded 
{{<junit.jupiter.execution.parallel.*>}} system property variables under the 
maven-surefire-plugin configuration block in the root {{pom.xml}}.
This was necessary to cleanly pass customized parallel execution flags 
externally using the {{-Dsurefire.module.config}} property.

h3. Test Scenario 1: Single JVM, Concurrent-Class Sequential Execution (1 
Thread)
In this scenario, I configured JUnit 5 to enable parallel class execution but 
throttled the underlying fixed thread pool down to exactly 1 thread 
({{max-pool-size=1}}, {{parallelism=1}}).
This forces the classes to run sequentially on a single thread while using the 
concurrent class scheduling mechanism.


*Execution Command:*
{noformat}
./mvnw test -pl flink-runtime \
  -Dfast -Djdk17 -Pjava17-target -fae \
  -Dgit.commit.id.skip=true -Dmaven.gitcommitid.skip=true \
  -Dflink.forkCountUnitTest=1 \
  -Dsurefire.module.config="--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
-Djunit.jupiter.execution.parallel.enabled=true 
-Djunit.jupiter.execution.parallel.mode.classes.default=concurrent 
-Djunit.jupiter.execution.parallel.mode.default=same_thread 
-Djunit.jupiter.execution.parallel.config.strategy=fixed 
-Djunit.jupiter.execution.parallel.config.fixed.max-pool-size=1 
-Djunit.jupiter.execution.parallel.config.fixed.saturate=true 
-Djunit.jupiter.execution.parallel.config.fixed.parallelism=1" \
  
-Dtest=ProfilingServiceTest,ShuffleMasterTest,TaskExecutorOperatorEventHandlingTest,TaskManagerRunnerStartupTest,TaskExecutorPartitionLifecycleTest,TaskManagerRunnerTest,TaskExecutorSlotLifetimeTest,TaskExecutorExecutionDeploymentReconciliationTest,TaskExecutorSubmissionTest,TaskExecutorRecoveryTest,TaskExecutorTest,PartialConsumePipelinedResultTest,LeaderChangeClusterComponentsTest,SlotCountExceedingParallelismTest
{noformat}

*Observation (Before Fix):*
As expected, the tests executing ahead of {{ProfilingServiceTest}} pollute the 
JVM-wide static singleton.
When {{ProfilingServiceTest}} eventually executes, its setup configuration is 
ignored, and the test fails on the rolling deletion assertion as the path 
defaults back to {{/tmp}}.

h3. Test Scenario 2: Single JVM, High Parallelism Stress Test (200 Threads)
Next, I ramped up the configuration to test heavy concurrent execution within 
the same JVM by increasing both {{fixed.max-pool-size}} and 
{{fixed.parallelism}} to *200*.

*Observation (Before Fix - Severe Flakiness Masking):*

Under heavy multi-threaded contention, I observed a highly dangerous side 
effect of the {{close()}} anti-pattern.
Out of the *5* test cases defined inside {{ProfilingServiceTest}}, *only 1 
single test case actually executed*, while the remaining 4 were silently 
dropped or swallowed. To make matters worse, Maven marked the entire run as a 
success (Green Build):

{noformat}
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 27.55 s 
-- in org.apache.flink.runtime.util.profiler.ProfilingServiceTest
{noformat}

This proves that without strict isolation and proper interface boundaries, the 
current {{ProfilingService::close}} logic actively masks test execution results 
during concurrent test execution, reporting successful builds despite dropped 
test coverage.

h2. Verification Results After Patch
After applying the proposed changes (*Interface Segregation* via 
{{ProfilingServiceSingleton}} + {{@Isolated}} on {{ProfilingServiceTest}} + 
*Explicit* {{close()}} state reset in {{setUp}}):

*Scenario 1 (Single Thread / Shared JVM):* Pass. Every preceding test runs 
successfully, and {{ProfilingServiceTest}} cleanly resets the state to work 
inside its assigned {{@TempDir}}.

*Scenario 2 (200 Threads / High Contention):* Pass. All 5 test cases inside 
ProfilingServiceTest are safely quarantined, execute successfully in their 
entirety, and no longer swallow test executions or pollute the surrounding 
environment.


h2. Previous JIRAs around this issue:
FLINK-38442: ProfilingServiceTest.testRollingDeletion is unstable (Resolved)
FLINK-35571: ProfilingServiceTest.testRollingDeletion intermittently fails due 
to improper test isolation (Open)
FLINK-34013: ProfilingServiceTest.testRollingDeletion is unstable on AZP 
(Resolved)





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to