Copilot commented on code in PR #10256:
URL: https://github.com/apache/seatunnel/pull/10256#discussion_r2652060980


##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -228,59 +230,60 @@ void testCleanupRunningJobStateIMap() {
                                     
coordinatorService.getJobMaster(jobInformation.jobId);
                             // job master should be null
                             Assertions.assertNull(jobMaster);
-                            
Assertions.assertTrue(runningJobStateIMap.isEmpty());
+                            
Assertions.assertTrue(runningJobStateStateStore.isEmpty());
                         });
 
         jobInformation.coordinatorService.clearCoordinatorService();
         jobInformation.coordinatorServiceTest.shutdown();
     }
 
     @Test
-    void testCleanupMetricsImap() {
+    void testCleanupMetricsStateStore() {
         JobInformation jobInformation =
                 submitJob(
-                        "CoordinatorServiceTest_testCleanupMetricsImap",
+                        "CoordinatorServiceTest_testCleanupMetricsStateStore",
                         "batch_fake_to_console.conf",
-                        "test_cleanup_metrics_imap");
+                        "test_cleanup_metrics_map_storage");
         CoordinatorService coordinatorService = 
jobInformation.coordinatorService;
-        IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap 
=
-                coordinatorService.getMetricsImap();
+        StateStore<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> 
metricsStateStore =
+                coordinatorService.getMetricsStateStore();
         await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> 
Assertions.assertFalse(metricsImap.isEmpty()));
+                .untilAsserted(() -> 
Assertions.assertFalse(metricsStateStore.isEmpty()));
         await().atMost(10000, TimeUnit.MILLISECONDS)
-                .untilAsserted(() -> 
Assertions.assertTrue(metricsImap.isEmpty()));
+                .untilAsserted(() -> 
Assertions.assertTrue(metricsStateStore.isEmpty()));
 
         jobInformation.coordinatorService.clearCoordinatorService();
         jobInformation.coordinatorServiceTest.shutdown();
     }
 
     @Test
-    void testCleanupMetricsImapWithPartitionConfig() {
+    void testCleanupMetricsStateStoreWithPartitionConfig() {
         setConfigFile("seatunnel_multiple_metrics_key.yaml");
 
         JobInformation jobInformation =
                 submitJob(
-                        
"CoordinatorServiceTest_testCleanupMetricsImapWithPartitionConfig",
+                        
"CoordinatorServiceTest_testCleanupMetricsStateStoreWithPartitionConfig",
                         "batch_fake_to_console.conf",
-                        "test_cleanup_metrics_imap_with_partition_config");
+                        
"test_cleanup_metrics_map_storage_with_partition_config");

Review Comment:
   The test name uses "map_storage_with_partition_config" which is inconsistent 
with the StateStore abstraction. For naming consistency, this should be 
"test_cleanup_metrics_state_store_with_partition_config" to match the 
StateStore terminology used throughout this PR.
   ```suggestion
                           
"test_cleanup_metrics_state_store_with_partition_config");
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java:
##########
@@ -59,14 +62,15 @@ public ILogger getLogger() {
     }
 
     public SeaTunnelMetricsContext getOrCreateMetricsContext(TaskLocation 
taskLocation) {
-        IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> map =
-                
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+        StateStore<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> map =
+                
distributedStoreManager.getMap(Constant.IMAP_RUNNING_JOB_METRICS);
         int partitionCount =
                 taskExecutionService
                         .getSeaTunnelConfig()
                         .getEngineConfig()
                         .getJobMetricsPartitionCount();
-        long partition = SeaTunnelServer.getMetricsImapPartition(taskLocation, 
partitionCount);
+        long partition =
+                SeaTunnelServer.getMetricsMapStoragePartition(taskLocation, 
partitionCount);

Review Comment:
   The method name "getMetricsMapStoragePartition" uses "MapStorage" 
terminology which is inconsistent with the StateStore abstraction introduced in 
this PR. For consistency, this should be renamed to 
"getMetricsStateStorePartition" to align with the StateStore naming convention 
used throughout the codebase.
   ```suggestion
                   SeaTunnelServer.getMetricsStateStorePartition(taskLocation, 
partitionCount);
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java:
##########
@@ -47,10 +46,10 @@
 
 @DisabledOnOs(OS.WINDOWS)
 @Disabled
-public class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
+class CheckpointManagerTest extends AbstractSeaTunnelServerTest {
 
     @Test
-    public void testHAByIMapCheckpointIDCounter() throws 
CheckpointStorageException {
+    void testHAByMapStorageCheckpointIDCounter() throws 
CheckpointStorageException {

Review Comment:
   The method name has been renamed from `testHAByIMapCheckpointIDCounter` to 
`testHAByMapStorageCheckpointIDCounter`, but "MapStorage" is not an accurate 
description of the abstraction being introduced. The abstraction is called 
"StateStore", not "MapStorage". The method should be named 
`testHAByStateStoreCheckpointIDCounter` to accurately reflect the StateStore 
abstraction.
   ```suggestion
       void testHAByStateStoreCheckpointIDCounter() throws 
CheckpointStorageException {
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -225,15 +226,15 @@ public List<SubPlan> getPipelineList() {
     }
 
     private void updateStateTimestamps(@NonNull JobStatus targetState) {
-        // we must update runningJobStateTimestampsIMap first and then can 
update
-        // runningJobStateIMap
-        Long[] stateTimestamps = runningJobStateTimestampsIMap.get(jobId);
+        // we must update runningJobStateTimestampsStateStore first and then 
can update
+        // runningJobStateStateStore

Review Comment:
   The comment refers to "runningJobStateTimestampsIMap" instead of the updated 
"runningJobStateTimestampsStateStore". This comment should be updated to 
reflect the new StateStore abstraction for consistency.
   ```suggestion
           // We must update runningJobStateTimestampsStateStore in the 
StateStore abstraction first
           // and then update runningJobStateStateStore to keep the job state 
and its timestamps consistent.
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -136,30 +140,32 @@ public PhysicalVertex(
             @NonNull JobImmutableInformation jobImmutableInformation,
             long initializationTimestamp,
             @NonNull NodeEngine nodeEngine,
-            @NonNull IMap runningJobStateIMap,
-            @NonNull IMap runningJobStateTimestampsIMap) {
+            @NonNull StateStore runningJobStateStateStore,
+            @NonNull StateStore runningJobStateTimestampsStateStore) {
+        this.distributedStoreManager = new 
DistributedStoreManager((NodeEngineImpl) nodeEngine);
         this.taskGroupLocation = taskGroup.getTaskGroupLocation();
         this.taskGroup = taskGroup;
         this.flakeIdGenerator = flakeIdGenerator;
         this.pluginJarsUrls = pluginJarsUrls;
         this.connectorJarIdentifiers = connectorJarIdentifiers;
 
         Long[] stateTimestamps = new Long[ExecutionState.values().length];
-        if 
(runningJobStateTimestampsIMap.get(taskGroup.getTaskGroupLocation()) == null) {
+        if 
(runningJobStateTimestampsStateStore.get(taskGroup.getTaskGroupLocation()) == 
null) {
             stateTimestamps[ExecutionState.INITIALIZING.ordinal()] = 
initializationTimestamp;
-            
runningJobStateTimestampsIMap.put(taskGroup.getTaskGroupLocation(), 
stateTimestamps);
+            runningJobStateTimestampsStateStore.put(
+                    taskGroup.getTaskGroupLocation(), stateTimestamps);
         }
 
-        if (runningJobStateIMap.get(taskGroupLocation) == null) {
-            // we must update runningJobStateTimestampsIMap first and then can 
update
-            // runningJobStateIMap
+        if (runningJobStateStateStore.get(taskGroupLocation) == null) {
+            // we must update runningJobStateTimestampsStateStore first and 
then can update
+            // runningJobStateStateStore

Review Comment:
   The comment refers to "runningJobStateTimestampsIMap" and 
"runningJobStateIMap" instead of the updated 
"runningJobStateTimestampsStateStore" and "runningJobStateStateStore". This 
comment should be updated to reflect the new StateStore abstraction for 
consistency.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -448,11 +457,11 @@ private void noticeTaskExecutionServiceCancel() {
     }
 
     private void updateStateTimestamps(@NonNull ExecutionState targetState) {
-        // we must update runningJobStateTimestampsIMap first and then can 
update
-        // runningJobStateIMap
-        Long[] stateTimestamps = 
runningJobStateTimestampsIMap.get(taskGroupLocation);
+        // we must update runningJobStateTimestampsStateStore first and then 
can update
+        // runningJobStateStateStore

Review Comment:
   The comment refers to "runningJobStateTimestampsIMap" and 
"runningJobStateIMap" instead of the updated 
"runningJobStateTimestampsStateStore" and "runningJobStateStateStore". This 
comment should be updated to reflect the new StateStore abstraction for 
consistency.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -597,42 +598,43 @@ public void handleCheckpointError(long pipelineId, 
boolean neverRestore) {
                         });
     }
 
-    private void removeJobIMap() {
+    private void removeJobMapStorage() {

Review Comment:
   The method name "removeJobMapStorage" uses "MapStorage" terminology which is 
inconsistent with the StateStore abstraction. For consistency with the new 
abstraction, this should be renamed to "removeJobStateStore" to align with 
StateStore naming conventions used throughout this PR.
   ```suggestion
       private void removeJobStateStore() {
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -1112,8 +1114,8 @@ protected void 
completeSchemaChangeAfterCheckpoint(CompletedCheckpoint checkpoin
         }
     }
 
-    public String getCheckpointStateImapKey() {
-        return checkpointStateImapKey;
+    public String getCheckpointStateStateStoreKey() {

Review Comment:
   The method name "getCheckpointStateStateStoreKey" uses "StateStore" twice 
(checkpointState + StateStore), which is redundant and confusing. Since this 
method returns a key used to identify checkpoint coordinator state in the 
StateStore, it should be renamed to "getCheckpointCoordinatorStateKey" or 
"getCheckpointStateKey" for clarity and to avoid the repetition.
   ```suggestion
       public String getCheckpointCoordinatorStateKey() {
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -597,42 +598,43 @@ public void handleCheckpointError(long pipelineId, 
boolean neverRestore) {
                         });
     }
 
-    private void removeJobIMap() {
+    private void removeJobMapStorage() {
         Long jobId = getJobImmutableInformation().getJobId();
-        runningJobStateTimestampsIMap.remove(jobId);
+        runningJobStateTimestampsStateStore.remove(jobId);
 
         getPhysicalPlan()
                 .getPipelineList()
                 .forEach(
                         pipeline -> {
-                            
runningJobStateIMap.remove(pipeline.getPipelineLocation());
-                            
runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
+                            
runningJobStateStateStore.remove(pipeline.getPipelineLocation());
+                            runningJobStateTimestampsStateStore.remove(
+                                    pipeline.getPipelineLocation());
                             pipeline.getCoordinatorVertexList()
                                     .forEach(
                                             coordinator -> {
-                                                runningJobStateIMap.remove(
+                                                
runningJobStateStateStore.remove(
                                                         
coordinator.getTaskGroupLocation());
-                                                
runningJobStateTimestampsIMap.remove(
+                                                
runningJobStateTimestampsStateStore.remove(
                                                         
coordinator.getTaskGroupLocation());
                                             });
 
                             pipeline.getPhysicalVertexList()
                                     .forEach(
                                             task -> {
-                                                runningJobStateIMap.remove(
+                                                
runningJobStateStateStore.remove(
                                                         
task.getTaskGroupLocation());
-                                                
runningJobStateTimestampsIMap.remove(
+                                                
runningJobStateTimestampsStateStore.remove(
                                                         
task.getTaskGroupLocation());
                                             });
 
-                            String checkpointStateImapKey =
+                            String checkpointStateStateStoreKey =
                                     checkpointManager
                                             
.getCheckpointCoordinator(pipeline.getPipelineId())
-                                            .getCheckpointStateImapKey();
-                            runningJobStateIMap.remove(checkpointStateImapKey);
+                                            .getCheckpointStateStateStoreKey();

Review Comment:
   The variable name "checkpointStateStateStoreKey" uses "StateStore" twice 
(checkpointState + StateStore), which is redundant and creates confusion. Since 
this key is used to identify checkpoint coordinator state in the StateStore, it 
should be named "checkpointCoordinatorStateKey" or "checkpointStateKey" to be 
clearer and avoid the repetition.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -414,7 +417,7 @@ public void removeMetrics(PipelineLocation 
pipelineLocation) {
                         });
     }
 
-    public static long getMetricsImapPartition(TaskLocation key, int 
partitionCount) {
+    public static long getMetricsMapStoragePartition(TaskLocation key, int 
partitionCount) {

Review Comment:
   The public method name "getMetricsMapStoragePartition" uses "MapStorage" 
terminology which is inconsistent with the StateStore abstraction introduced in 
this PR. Since this is a public method, it should be renamed to 
"getMetricsStateStorePartition" to align with the StateStore naming convention. 
This ensures API consistency with the new abstraction layer.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java:
##########
@@ -415,11 +416,11 @@ private synchronized void reset() throws Exception {
     }
 
     private void updateStateTimestamps(@NonNull PipelineStatus targetState) {
-        // we must update runningJobStateTimestampsIMap first and then can 
update
-        // runningJobStateIMap
-        Long[] stateTimestamps = 
runningJobStateTimestampsIMap.get(pipelineLocation);
+        // we must update runningJobStateTimestampsMap first and then can 
update
+        // runningJobStateMap

Review Comment:
   The comment still refers to "runningJobStateTimestampsMap" and 
"runningJobStateMap" but the actual implementation uses 
"runningJobStateTimestampsStateStore" and "runningJobStateStateStore". The 
comment should be updated to reference the StateStore variants to maintain 
consistency with the actual code.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -1008,26 +1010,26 @@ public CompletableFuture<Boolean> savePoint() {
     public void setOwnedSlotProfiles(
             @NonNull PipelineLocation pipelineLocation,
             @NonNull Map<TaskGroupLocation, SlotProfile> 
pipelineOwnedSlotProfiles) {
-        ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles);
+        ownedSlotProfilesStateStore.put(pipelineLocation, 
pipelineOwnedSlotProfiles);
         try {
             RetryUtils.retryWithException(
                     () ->
                             pipelineOwnedSlotProfiles.equals(
-                                    
ownedSlotProfilesIMap.get(pipelineLocation)),
+                                    
ownedSlotProfilesStateStore.get(pipelineLocation)),
                     new RetryUtils.RetryMaterial(
                             Constant.OPERATION_RETRY_TIME,
                             true,
                             exception -> exception instanceof 
NullPointerException && isRunning,
                             Constant.OPERATION_RETRY_SLEEP));
         } catch (Exception e) {
             throw new SeaTunnelEngineException(
-                    "Can not sync pipeline owned slot profiles with IMap", e);
+                    "Can not sync pipeline owned slot profiles with 
StateStore", e);

Review Comment:
   The error message refers to "IMap" instead of "StateStore", which no longer 
accurately describes the abstraction layer being used. The message should be 
updated to say "Can not sync pipeline owned slot profiles with StateStore" to 
reflect the new StateStore abstraction.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java:
##########
@@ -361,13 +362,13 @@ public synchronized void updatePipelineState(@NonNull 
PipelineStatus targetState
             }
 
             // now do the actual state transition
-            // we must update runningJobStateTimestampsIMap first and then can 
update
-            // runningJobStateIMap
+            // we must update runningJobStateTimestampsMap first and then can 
update
+            // runningJobStateMap

Review Comment:
   The comment still refers to "runningJobStateTimestampsMap" and 
"runningJobStateMap" but the actual implementation uses 
"runningJobStateTimestampsStateStore" and "runningJobStateStateStore". The 
comment should be updated to reference the StateStore variants to maintain 
consistency with the actual code.
   ```suggestion
               // we must update runningJobStateTimestampsStateStore first and 
then can update
               // runningJobStateStateStore
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java:
##########
@@ -258,12 +259,13 @@ public synchronized void updateJobState(@NonNull 
JobStatus targetState) {
                 throw new SeaTunnelEngineException(message);
             }
 
-            // Now do the actual state transition, we must update 
runningJobStateTimestampsIMap
-            // first and then can update runningJobStateIMap
+            // Now do the actual state transition, we must update
+            // runningJobStateTimestampsStateStore
+            // first and then can update runningJobStateStateStore

Review Comment:
   The comment refers to "runningJobStateTimestampsIMap" and 
"runningJobStateIMap" instead of the updated 
"runningJobStateTimestampsStateStore" and "runningJobStateStateStore". This 
comment should be updated to reflect the new StateStore abstraction for 
consistency.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java:
##########
@@ -90,40 +90,40 @@ public class JobHistoryService {
      */
     private final Map<Long, PendingJobInfo> pendingJobInfoMap;
 
-    /** finishedJobVertexInfoImap key is jobId and value is JobDAGInfo */
-    private final IMap<Long, JobDAGInfo> finishedJobDAGInfoImap;
+    /** finishedJoDAGInfoStateStore key is jobId and value is JobDAGInfo */

Review Comment:
   The JavaDoc comment has a typo: "finishedJoDAGInfoStateStore" is missing the 
'b' in "Job". It should be "finishedJobDAGInfoStateStore" to correctly describe 
the field name.
   ```suggestion
       /** finishedJobDAGInfoStateStore key is jobId and value is JobDAGInfo */
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -1058,8 +1060,8 @@ private synchronized void updateStatus(@NonNull 
CheckpointCoordinatorStatus targ
         } catch (Exception e) {
             LOG.warn(
                     String.format(
-                            "Set %s state %s to IMap failed, skip do it",
-                            checkpointStateImapKey, targetStatus));
+                            "Set %s state %s to StateStore failed, skip do it",
+                            checkpointStateStateStoreKey, targetStatus));

Review Comment:
   The log message refers to "IMap" instead of "StateStore", which no longer 
accurately describes the abstraction layer being used. The message should be 
updated to say "Set %s state %s to StateStore failed, skip do it" to reflect 
the new StateStore abstraction.



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -193,15 +193,17 @@ void testCleanupPendingJobMasterMapAfterJobFailed() {
     }
 
     @Test
-    void testCleanupRunningJobStateIMap() {
+    void testCleanupRunningJobStateStateStore() {
         JobInformation jobInformation =
                 submitJob(
-                        
"CoordinatorServiceTest_testCleanupRunningJobStateIMap",
+                        
"CoordinatorServiceTest_testCleanupRunningJobStateStateStore",
                         "batch_fake_to_console.conf",
-                        "test_cleanup_running_job_state_imap");
+                        "test_cleanup_running_job_state_map_storage");

Review Comment:
   The method name has been changed to use "map_storage" as part of the test 
name string, but this is inconsistent with the abstraction being introduced. 
The abstraction is called "StateStore", not "MapStorage". For consistency, this 
should be changed to "test_cleanup_running_job_state_store" to match the naming 
convention used elsewhere (e.g., "StateStore", "runningJobStateStateStore").
   ```suggestion
                           "test_cleanup_running_job_state_store");
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java:
##########
@@ -147,21 +147,21 @@ public SubPlan(
                                                         .defaultValue())
                                 .toString());
         Long[] stateTimestamps = new Long[PipelineStatus.values().length];
-        if (runningJobStateTimestampsIMap.get(pipelineLocation) == null) {
+        if (runningJobStateTimestampsStateStore.get(pipelineLocation) == null) 
{
             stateTimestamps[PipelineStatus.INITIALIZING.ordinal()] = 
initializationTimestamp;
-            runningJobStateTimestampsIMap.put(pipelineLocation, 
stateTimestamps);
+            runningJobStateTimestampsStateStore.put(pipelineLocation, 
stateTimestamps);
         }
 
-        if (runningJobStateIMap.get(pipelineLocation) == null) {
-            // we must update runningJobStateTimestampsIMap first and then can 
update
-            // runningJobStateIMap
+        if (runningJobStateStateStore.get(pipelineLocation) == null) {
+            // we must update runningJobStateTimestampsMap first and then can 
update
+            // runningJobStateMap

Review Comment:
   The comment still refers to "runningJobStateTimestampsMap" but the actual 
implementation uses "runningJobStateTimestampsStateStore". The comment should 
be updated to reference "runningJobStateTimestampsStateStore" to match the 
actual code and maintain consistency.
   ```suggestion
               // we must update runningJobStateTimestampsStateStore first and 
then can update
               // runningJobStateStateStore
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java:
##########
@@ -147,13 +147,13 @@ public class CheckpointCoordinator {
 
     private AtomicReference<String> errorByPhysicalVertex = new 
AtomicReference<>();
 
-    private final IMap<Object, Object> runningJobStateIMap;
+    private final StateStore<Object, Object> runningJobStateStateStore;
 
     // save pending checkpoint for savepoint, to make sure the different 
savepoint request can be
     // processed with one savepoint operation in the same time.
     private PendingCheckpoint savepointPendingCheckpoint;
 
-    private final String checkpointStateImapKey;
+    private final String checkpointStateStateStoreKey;

Review Comment:
   The variable name "checkpointStateStateStoreKey" uses "StateStore" twice 
(checkpointState + StateStore), which is redundant and confusing. Since this 
key identifies checkpoint coordinator state in the StateStore, it should be 
renamed to "checkpointCoordinatorStateKey" or "checkpointStateKey" for clarity 
and to avoid repetition.



##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java:
##########
@@ -228,59 +230,60 @@ void testCleanupRunningJobStateIMap() {
                                     
coordinatorService.getJobMaster(jobInformation.jobId);
                             // job master should be null
                             Assertions.assertNull(jobMaster);
-                            
Assertions.assertTrue(runningJobStateIMap.isEmpty());
+                            
Assertions.assertTrue(runningJobStateStateStore.isEmpty());
                         });
 
         jobInformation.coordinatorService.clearCoordinatorService();
         jobInformation.coordinatorServiceTest.shutdown();
     }
 
     @Test
-    void testCleanupMetricsImap() {
+    void testCleanupMetricsStateStore() {
         JobInformation jobInformation =
                 submitJob(
-                        "CoordinatorServiceTest_testCleanupMetricsImap",
+                        "CoordinatorServiceTest_testCleanupMetricsStateStore",
                         "batch_fake_to_console.conf",
-                        "test_cleanup_metrics_imap");
+                        "test_cleanup_metrics_map_storage");

Review Comment:
   The test name uses "map_storage" which is inconsistent with the StateStore 
abstraction. For naming consistency, this should be 
"test_cleanup_metrics_state_store" to match the StateStore terminology used 
throughout this PR.
   ```suggestion
                           "test_cleanup_metrics_state_store");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to