Myasuka commented on code in PR #23820:
URL: https://github.com/apache/flink/pull/23820#discussion_r1423532806


##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {

Review Comment:
   You can extend this class from `TestLogger`, so that you could have a `log` 
to print logs.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import one.profiler.AsyncProfiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/** Create and keep profiling requests with rolling policy. */
+public class ProfilingService implements Closeable {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(ProfilingService.class);
+    private static final SimpleDateFormat sdf = new 
SimpleDateFormat("yyyy-MM-dd_HH_mm_ss");
+    private static ProfilingService instance;
+    private final Map<String, ArrayDeque<ProfilingInfo>> profilingMap;
+    private final String profilingResultDir;
+    private final int historySizeLimit;
+    private final ScheduledExecutorService scheduledExecutor;
+    private ProfilingFuture profilingFuture;
+
+    private ProfilingService(Configuration configs) {
+        this.profilingMap = new HashMap<>();
+        this.historySizeLimit = 
configs.getInteger(RestOptions.MAX_PROFILING_HISTORY_SIZE);
+        Preconditions.checkArgument(
+                historySizeLimit > 0,
+                String.format(
+                        "Configured %s must be positive.",
+                        RestOptions.MAX_PROFILING_HISTORY_SIZE.key()));
+        this.profilingResultDir = 
configs.getString(RestOptions.PROFILING_RESULT_DIR);
+        this.scheduledExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory.Builder()
+                                .setPoolName("flink-profiling-service")
+                                .build());
+    }
+
+    public static ProfilingService getInstance(Configuration configs) {
+        if (instance == null) {
+            instance = new ProfilingService(configs);
+        }
+        return instance;
+    }
+
+    public CompletableFuture<ProfilingInfo> requestProfiling(
+            String resourceID, long duration, ProfilingInfo.ProfilingMode 
mode) {
+        if (profilingFuture != null && !profilingFuture.isDone()) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException(resourceID + " is still under 
profiling."));
+        }
+        ProfilingInfo profilingInfo = ProfilingInfo.create(duration, mode);
+        profilingMap.putIfAbsent(resourceID, new ArrayDeque<>());
+        profilingMap.get(resourceID).addFirst(profilingInfo);
+        AsyncProfiler profiler = AsyncProfiler.getInstance();
+        try {
+            String response =
+                    profiler.execute(
+                            ProfilerConstants.COMMAND_START.msg
+                                    + 
profilingInfo.getProfilingMode().getCode());
+            if (StringUtils.isNullOrWhitespaceOnly(response)
+                    || 
!response.startsWith(ProfilerConstants.PROFILER_STARTED_SUCCESS.msg)) {
+                return CompletableFuture.completedFuture(
+                        profilingInfo.fail("Start profiler failed. " + 
response));
+            }
+        } catch (Exception e) {
+            return CompletableFuture.completedFuture(
+                    profilingInfo.fail("Start profiler failed. " + e));
+        }
+
+        scheduledExecutor.schedule(() -> stopProfiling(resourceID), duration, 
TimeUnit.SECONDS);
+
+        this.profilingFuture = new ProfilingFuture(duration, () -> 
stopProfiling(resourceID));
+        return CompletableFuture.completedFuture(profilingInfo);
+    }
+
+    private void stopProfiling(String resourceID) {
+        AsyncProfiler profiler = AsyncProfiler.getInstance();
+        ArrayDeque<ProfilingInfo> profilingList = profilingMap.get(resourceID);
+        Preconditions.checkState(!CollectionUtil.isNullOrEmpty(profilingList));
+        ProfilingInfo info = profilingList.getFirst();
+        try {
+            String fileName = formatOutputFileName(resourceID, info);
+            String outputPath = new File(profilingResultDir, 
fileName).getPath();
+            String response = 
profiler.execute(ProfilerConstants.COMMAND_STOP.msg + outputPath);
+            if (!StringUtils.isNullOrWhitespaceOnly(response)
+                    && 
response.startsWith(ProfilerConstants.PROFILER_STOPPED_SUCCESS.msg)) {
+                info.success(fileName);
+            } else {
+                info.fail("Stop profiler failed. " + response);
+            }
+            rollingClearing(profilingList);
+        } catch (Throwable e) {
+            info.fail("Stop profiler failed. " + e);
+        }
+    }
+
+    private void rollingClearing(ArrayDeque<ProfilingInfo> profilingList) {
+        while (profilingList.size() > historySizeLimit) {
+            ProfilingInfo info = profilingList.pollLast();
+            String outputFile = info != null ? info.getOutputFile() : "";
+            if (StringUtils.isNullOrWhitespaceOnly(outputFile)) {
+                continue;
+            }
+            try {
+                Files.deleteIfExists(Paths.get(profilingResultDir, 
outputFile));
+            } catch (Exception e) {
+                LOG.error(String.format("Clearing file for %s failed. 
Skipped.", info), e);
+            }
+        }
+    }
+
+    private String formatOutputFileName(String resourceID, ProfilingInfo info) 
{
+        return String.format(
+                "%s_%s_%s.html", resourceID, info.getProfilingMode(), 
sdf.format(new Date()));
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (profilingFuture != null && !profilingFuture.isDone()) {
+                profilingFuture.cancel();
+            }
+            scheduledExecutor.shutdownNow();

Review Comment:
   Can we add some checks here?
   
   ~~~java
               if (!scheduledExecutor.isShutdown()) {
                   scheduledExecutor.shutdownNow();
               }
   ~~~



##########
flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import one.profiler.AsyncProfiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/** Create and keep profiling requests with rolling policy. */
+public class ProfilingService implements Closeable {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(ProfilingService.class);
+    private static final SimpleDateFormat sdf = new 
SimpleDateFormat("yyyy-MM-dd_HH_mm_ss");
+    private static ProfilingService instance;
+    private final Map<String, ArrayDeque<ProfilingInfo>> profilingMap;
+    private final String profilingResultDir;
+    private final int historySizeLimit;
+    private final ScheduledExecutorService scheduledExecutor;
+    private ProfilingFuture profilingFuture;
+
+    private ProfilingService(Configuration configs) {
+        this.profilingMap = new HashMap<>();
+        this.historySizeLimit = 
configs.getInteger(RestOptions.MAX_PROFILING_HISTORY_SIZE);
+        Preconditions.checkArgument(
+                historySizeLimit > 0,
+                String.format(
+                        "Configured %s must be positive.",
+                        RestOptions.MAX_PROFILING_HISTORY_SIZE.key()));
+        this.profilingResultDir = 
configs.getString(RestOptions.PROFILING_RESULT_DIR);
+        this.scheduledExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory.Builder()
+                                .setPoolName("flink-profiling-service")
+                                .build());
+    }
+
+    public static ProfilingService getInstance(Configuration configs) {
+        if (instance == null) {
+            instance = new ProfilingService(configs);
+        }
+        return instance;
+    }

Review Comment:
   I think this getter is not a singleton one, can we make it a singleton one?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingServiceTest.class);
+    private static final Configuration configs = new Configuration();
+    private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf 
events.";
+    private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer 
symbols found.";
+    private static final String resourceID = "TestJobManager";
+    private static final long profilingDuration = 3L;
+    private static final int historySizeLimit = 3;
+
+    private ProfilingService profilingService;
+
+    @BeforeAll
+    static void beforeAll() {
+        configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit);
+    }
+
+    @BeforeEach
+    void setUp(@TempDir Path tempDir) {
+        configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString());
+        profilingService = ProfilingService.getInstance(configs);
+        verifyConfigsWorks(profilingService, tempDir);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        profilingService.close();
+    }
+
+    @Test
+    public void testSingleInstance() throws IOException {
+        ProfilingService instance = ProfilingService.getInstance(configs);
+        Assertions.assertEquals(profilingService, instance);
+        instance.close();
+    }
+
+    @Test
+    void testFailedRequestUnderProfiling() throws ExecutionException, 
InterruptedException {
+        ProfilingInfo profilingInfo =
+                profilingService
+                        .requestProfiling(resourceID, 10, 
ProfilingInfo.ProfilingMode.ITIMER)
+                        .get();
+        Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, 
profilingInfo.getStatus());
+        try {
+            profilingService
+                    .requestProfiling(
+                            resourceID, profilingDuration, 
ProfilingInfo.ProfilingMode.ITIMER)
+                    .get();
+            Assertions.fail("Duplicate profiling request should throw with 
IllegalStateException.");
+        } catch (Exception e) {
+            Assertions.assertTrue(e.getCause() instanceof 
IllegalStateException);
+        }
+    }
+
+    @Test
+    @Timeout(value = 1, unit = TimeUnit.MINUTES)
+    public void testAllProfilingMode() throws ExecutionException, 
InterruptedException {
+        for (ProfilingInfo.ProfilingMode mode : 
ProfilingInfo.ProfilingMode.values()) {
+            ProfilingInfo profilingInfo =
+                    profilingService.requestProfiling(resourceID, 
profilingDuration, mode).get();
+            if (isNoPermissionOrAllocateSymbol(profilingInfo)) {
+                LOG.warn(
+                        "Ignoring failed profiling instance in {} mode, which 
caused by no permission.",

Review Comment:
   I think there exist more messages other than `no permission` here.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingServiceTest.class);
+    private static final Configuration configs = new Configuration();
+    private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf 
events.";
+    private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer 
symbols found.";
+    private static final String resourceID = "TestJobManager";
+    private static final long profilingDuration = 3L;
+    private static final int historySizeLimit = 3;
+
+    private ProfilingService profilingService;
+
+    @BeforeAll
+    static void beforeAll() {
+        configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit);
+    }
+
+    @BeforeEach
+    void setUp(@TempDir Path tempDir) {
+        configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString());
+        profilingService = ProfilingService.getInstance(configs);
+        verifyConfigsWorks(profilingService, tempDir);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        profilingService.close();
+    }
+
+    @Test
+    public void testSingleInstance() throws IOException {
+        ProfilingService instance = ProfilingService.getInstance(configs);
+        Assertions.assertEquals(profilingService, instance);
+        instance.close();
+    }
+
+    @Test
+    void testFailedRequestUnderProfiling() throws ExecutionException, 
InterruptedException {
+        ProfilingInfo profilingInfo =
+                profilingService
+                        .requestProfiling(resourceID, 10, 
ProfilingInfo.ProfilingMode.ITIMER)
+                        .get();
+        Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, 
profilingInfo.getStatus());
+        try {
+            profilingService
+                    .requestProfiling(
+                            resourceID, profilingDuration, 
ProfilingInfo.ProfilingMode.ITIMER)
+                    .get();
+            Assertions.fail("Duplicate profiling request should throw with 
IllegalStateException.");
+        } catch (Exception e) {
+            Assertions.assertTrue(e.getCause() instanceof 
IllegalStateException);
+        }
+    }
+
+    @Test
+    @Timeout(value = 1, unit = TimeUnit.MINUTES)
+    public void testAllProfilingMode() throws ExecutionException, 
InterruptedException {
+        for (ProfilingInfo.ProfilingMode mode : 
ProfilingInfo.ProfilingMode.values()) {
+            ProfilingInfo profilingInfo =
+                    profilingService.requestProfiling(resourceID, 
profilingDuration, mode).get();
+            if (isNoPermissionOrAllocateSymbol(profilingInfo)) {
+                LOG.warn(
+                        "Ignoring failed profiling instance in {} mode, which 
caused by no permission.",
+                        profilingInfo.getProfilingMode());
+                continue;
+            }
+            Assertions.assertEquals(
+                    ProfilingInfo.ProfilingStatus.RUNNING,
+                    profilingInfo.getStatus(),
+                    String.format(
+                            "Submitting profiling request should be succeed or 
no permission, but got errorMsg=%s",
+                            profilingInfo.getMessage()));
+            waitForProfilingFinished(profilingService);
+            Assertions.assertEquals(
+                    ProfilingInfo.ProfilingStatus.FINISHED,
+                    profilingInfo.getStatus(),
+                    String.format(
+                            "Profiling request should complete successful, but 
got errorMsg=%s",
+                            profilingInfo.getMessage()));
+        }
+
+        verifyRollingDeletion(profilingService);
+    }
+
+    private void verifyConfigsWorks(ProfilingService profilingService, Path 
configuredDir) {
+        Assertions.assertEquals(configuredDir.toString(), 
profilingService.getProfilingResultDir());
+        Assertions.assertEquals(historySizeLimit, 
profilingService.getHistorySizeLimit());
+    }
+
+    private void verifyRollingDeletion(ProfilingService profilingService) {
+        ArrayDeque<ProfilingInfo> profilingList =
+                profilingService.getProfilingListForTest(resourceID);
+        // Profiling History shouldn't exceed history size limit.
+        Assertions.assertEquals(historySizeLimit, profilingList.size());
+        // Profiling History files should be rolling deleted.
+        Set<String> resultFileNames = new HashSet<>();
+        File configuredDir = new 
File(profilingService.getProfilingResultDir());
+        for (File f : Objects.requireNonNull(configuredDir.listFiles())) {
+            resultFileNames.add(f.getName());
+        }
+        Assertions.assertEquals(profilingList.size(), resultFileNames.size());
+        for (ProfilingInfo profilingInfo : profilingList) {
+            String outputFile = profilingInfo.getOutputFile();
+            Assertions.assertTrue(resultFileNames.contains(outputFile));
+        }
+    }
+
+    private void waitForProfilingFinished(ProfilingService profilingService) {
+        while (!profilingService.getProfilingFuture().isDone()) {}

Review Comment:
   It's better to add a `Thread#sleep` here.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingServiceTest.class);
+    private static final Configuration configs = new Configuration();
+    private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf 
events.";
+    private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer 
symbols found.";
+    private static final String resourceID = "TestJobManager";
+    private static final long profilingDuration = 3L;
+    private static final int historySizeLimit = 3;
+
+    private ProfilingService profilingService;
+
+    @BeforeAll
+    static void beforeAll() {
+        configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit);
+    }
+
+    @BeforeEach
+    void setUp(@TempDir Path tempDir) {
+        configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString());
+        profilingService = ProfilingService.getInstance(configs);
+        verifyConfigsWorks(profilingService, tempDir);

Review Comment:
   I don't think we need to verify the configs within the `setup`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingServiceTest.class);
+    private static final Configuration configs = new Configuration();
+    private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf 
events.";
+    private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer 
symbols found.";
+    private static final String resourceID = "TestJobManager";
+    private static final long profilingDuration = 3L;
+    private static final int historySizeLimit = 3;
+
+    private ProfilingService profilingService;
+
+    @BeforeAll
+    static void beforeAll() {
+        configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit);
+    }
+
+    @BeforeEach
+    void setUp(@TempDir Path tempDir) {
+        configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString());
+        profilingService = ProfilingService.getInstance(configs);
+        verifyConfigsWorks(profilingService, tempDir);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        profilingService.close();
+    }
+
+    @Test
+    public void testSingleInstance() throws IOException {
+        ProfilingService instance = ProfilingService.getInstance(configs);
+        Assertions.assertEquals(profilingService, instance);
+        instance.close();
+    }
+
+    @Test
+    void testFailedRequestUnderProfiling() throws ExecutionException, 
InterruptedException {
+        ProfilingInfo profilingInfo =
+                profilingService
+                        .requestProfiling(resourceID, 10, 
ProfilingInfo.ProfilingMode.ITIMER)
+                        .get();
+        Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, 
profilingInfo.getStatus());
+        try {
+            profilingService
+                    .requestProfiling(
+                            resourceID, profilingDuration, 
ProfilingInfo.ProfilingMode.ITIMER)
+                    .get();
+            Assertions.fail("Duplicate profiling request should throw with 
IllegalStateException.");
+        } catch (Exception e) {
+            Assertions.assertTrue(e.getCause() instanceof 
IllegalStateException);
+        }
+    }
+
+    @Test
+    @Timeout(value = 1, unit = TimeUnit.MINUTES)
+    public void testAllProfilingMode() throws ExecutionException, 
InterruptedException {
+        for (ProfilingInfo.ProfilingMode mode : 
ProfilingInfo.ProfilingMode.values()) {
+            ProfilingInfo profilingInfo =
+                    profilingService.requestProfiling(resourceID, 
profilingDuration, mode).get();
+            if (isNoPermissionOrAllocateSymbol(profilingInfo)) {
+                LOG.warn(
+                        "Ignoring failed profiling instance in {} mode, which 
caused by no permission.",
+                        profilingInfo.getProfilingMode());
+                continue;
+            }
+            Assertions.assertEquals(
+                    ProfilingInfo.ProfilingStatus.RUNNING,
+                    profilingInfo.getStatus(),
+                    String.format(
+                            "Submitting profiling request should be succeed or 
no permission, but got errorMsg=%s",
+                            profilingInfo.getMessage()));
+            waitForProfilingFinished(profilingService);
+            Assertions.assertEquals(
+                    ProfilingInfo.ProfilingStatus.FINISHED,
+                    profilingInfo.getStatus(),
+                    String.format(
+                            "Profiling request should complete successful, but 
got errorMsg=%s",
+                            profilingInfo.getMessage()));
+        }
+
+        verifyRollingDeletion(profilingService);
+    }
+
+    private void verifyConfigsWorks(ProfilingService profilingService, Path 
configuredDir) {
+        Assertions.assertEquals(configuredDir.toString(), 
profilingService.getProfilingResultDir());
+        Assertions.assertEquals(historySizeLimit, 
profilingService.getHistorySizeLimit());
+    }
+
+    private void verifyRollingDeletion(ProfilingService profilingService) {
+        ArrayDeque<ProfilingInfo> profilingList =
+                profilingService.getProfilingListForTest(resourceID);
+        // Profiling History shouldn't exceed history size limit.
+        Assertions.assertEquals(historySizeLimit, profilingList.size());
+        // Profiling History files should be rolling deleted.
+        Set<String> resultFileNames = new HashSet<>();
+        File configuredDir = new 
File(profilingService.getProfilingResultDir());
+        for (File f : Objects.requireNonNull(configuredDir.listFiles())) {
+            resultFileNames.add(f.getName());
+        }
+        Assertions.assertEquals(profilingList.size(), resultFileNames.size());
+        for (ProfilingInfo profilingInfo : profilingList) {
+            String outputFile = profilingInfo.getOutputFile();
+            Assertions.assertTrue(resultFileNames.contains(outputFile));
+        }
+    }
+
+    private void waitForProfilingFinished(ProfilingService profilingService) {
+        while (!profilingService.getProfilingFuture().isDone()) {}
+    }
+
+    /**
+     * Check profiling instance failed caused by no permission to perf_events 
or missing of JDK
+     * debug symbols.
+     *
+     * @return true if no permission to access perf_events.

Review Comment:
   ```suggestion
        * @return true if no permission to access perf_events or no AllocTracer 
symbols found.
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingServiceTest.class);
+    private static final Configuration configs = new Configuration();
+    private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf 
events.";
+    private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer 
symbols found.";
+    private static final String resourceID = "TestJobManager";
+    private static final long profilingDuration = 3L;
+    private static final int historySizeLimit = 3;
+
+    private ProfilingService profilingService;
+
+    @BeforeAll
+    static void beforeAll() {
+        configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit);
+    }
+
+    @BeforeEach
+    void setUp(@TempDir Path tempDir) {
+        configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString());
+        profilingService = ProfilingService.getInstance(configs);
+        verifyConfigsWorks(profilingService, tempDir);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        profilingService.close();
+    }
+
+    @Test
+    public void testSingleInstance() throws IOException {
+        ProfilingService instance = ProfilingService.getInstance(configs);
+        Assertions.assertEquals(profilingService, instance);
+        instance.close();
+    }
+
+    @Test
+    void testFailedRequestUnderProfiling() throws ExecutionException, 
InterruptedException {
+        ProfilingInfo profilingInfo =
+                profilingService
+                        .requestProfiling(resourceID, 10, 
ProfilingInfo.ProfilingMode.ITIMER)
+                        .get();
+        Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, 
profilingInfo.getStatus());
+        try {
+            profilingService
+                    .requestProfiling(
+                            resourceID, profilingDuration, 
ProfilingInfo.ProfilingMode.ITIMER)
+                    .get();
+            Assertions.fail("Duplicate profiling request should throw with 
IllegalStateException.");
+        } catch (Exception e) {
+            Assertions.assertTrue(e.getCause() instanceof 
IllegalStateException);
+        }
+    }
+
+    @Test
+    @Timeout(value = 1, unit = TimeUnit.MINUTES)
+    public void testAllProfilingMode() throws ExecutionException, 
InterruptedException {
+        for (ProfilingInfo.ProfilingMode mode : 
ProfilingInfo.ProfilingMode.values()) {
+            ProfilingInfo profilingInfo =
+                    profilingService.requestProfiling(resourceID, 
profilingDuration, mode).get();
+            if (isNoPermissionOrAllocateSymbol(profilingInfo)) {
+                LOG.warn(
+                        "Ignoring failed profiling instance in {} mode, which 
caused by no permission.",
+                        profilingInfo.getProfilingMode());
+                continue;
+            }
+            Assertions.assertEquals(
+                    ProfilingInfo.ProfilingStatus.RUNNING,
+                    profilingInfo.getStatus(),
+                    String.format(
+                            "Submitting profiling request should be succeed or 
no permission, but got errorMsg=%s",
+                            profilingInfo.getMessage()));
+            waitForProfilingFinished(profilingService);
+            Assertions.assertEquals(
+                    ProfilingInfo.ProfilingStatus.FINISHED,
+                    profilingInfo.getStatus(),
+                    String.format(
+                            "Profiling request should complete successful, but 
got errorMsg=%s",
+                            profilingInfo.getMessage()));
+        }
+
+        verifyRollingDeletion(profilingService);
+    }
+
+    private void verifyConfigsWorks(ProfilingService profilingService, Path 
configuredDir) {
+        Assertions.assertEquals(configuredDir.toString(), 
profilingService.getProfilingResultDir());
+        Assertions.assertEquals(historySizeLimit, 
profilingService.getHistorySizeLimit());
+    }
+
+    private void verifyRollingDeletion(ProfilingService profilingService) {
+        ArrayDeque<ProfilingInfo> profilingList =
+                profilingService.getProfilingListForTest(resourceID);
+        // Profiling History shouldn't exceed history size limit.
+        Assertions.assertEquals(historySizeLimit, profilingList.size());

Review Comment:
   Can we always assert this if some tests cannot be taken due to lacking 
permission.
   How about `Assertions.assertTrue(profilingList.size() <= 
profilingService.getHistorySizeLimit());`?



##########
flink-dist/src/main/resources/META-INF/NOTICE:
##########
@@ -20,6 +20,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.lz4:lz4-java:1.8.0
 - org.objenesis:objenesis:2.1
 - org.xerial.snappy:snappy-java:1.1.10.4
+- tools.profiler:async-profiler:2.9

Review Comment:
   This change should be included in the 1st commit.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingServiceTest.class);
+    private static final Configuration configs = new Configuration();
+    private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf 
events.";
+    private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer 
symbols found.";
+    private static final String resourceID = "TestJobManager";
+    private static final long profilingDuration = 3L;
+    private static final int historySizeLimit = 3;
+
+    private ProfilingService profilingService;
+
+    @BeforeAll
+    static void beforeAll() {
+        configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit);
+    }
+
+    @BeforeEach
+    void setUp(@TempDir Path tempDir) {
+        configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString());
+        profilingService = ProfilingService.getInstance(configs);
+        verifyConfigsWorks(profilingService, tempDir);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        profilingService.close();
+    }
+
+    @Test
+    public void testSingleInstance() throws IOException {
+        ProfilingService instance = ProfilingService.getInstance(configs);
+        Assertions.assertEquals(profilingService, instance);
+        instance.close();

Review Comment:
   It's better to add a `finally` here.



##########
flink-runtime-web/web-dashboard/src/app/components/humanize-watermark.pipe.ts:
##########
@@ -44,8 +44,8 @@ export class HumanizeWatermarkToDatetimePipe implements 
PipeTransform {
   constructor(private readonly configService: ConfigService) {}
 
   public transform(value: number): number | string {
-    if (isNaN(value) || value <= this.configService.LONG_MIN_VALUE) {
-      return '-';
+    if (value == null || isNaN(value) || value <= 
this.configService.LONG_MIN_VALUE) {
+      return 'N/A';

Review Comment:
   Why we introduce these changes in this PR?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util.profiler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/** Unit tests for {@link ProfilingService}. */
+public class ProfilingServiceTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ProfilingServiceTest.class);
+    private static final Configuration configs = new Configuration();
+    private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf 
events.";
+    private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer 
symbols found.";
+    private static final String resourceID = "TestJobManager";
+    private static final long profilingDuration = 3L;
+    private static final int historySizeLimit = 3;
+
+    private ProfilingService profilingService;
+
+    @BeforeAll
+    static void beforeAll() {
+        configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit);
+    }
+
+    @BeforeEach
+    void setUp(@TempDir Path tempDir) {
+        configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString());
+        profilingService = ProfilingService.getInstance(configs);
+        verifyConfigsWorks(profilingService, tempDir);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        profilingService.close();
+    }
+
+    @Test
+    public void testSingleInstance() throws IOException {
+        ProfilingService instance = ProfilingService.getInstance(configs);
+        Assertions.assertEquals(profilingService, instance);
+        instance.close();
+    }
+
+    @Test
+    void testFailedRequestUnderProfiling() throws ExecutionException, 
InterruptedException {
+        ProfilingInfo profilingInfo =
+                profilingService
+                        .requestProfiling(resourceID, 10, 
ProfilingInfo.ProfilingMode.ITIMER)
+                        .get();
+        Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, 
profilingInfo.getStatus());
+        try {
+            profilingService
+                    .requestProfiling(
+                            resourceID, profilingDuration, 
ProfilingInfo.ProfilingMode.ITIMER)
+                    .get();
+            Assertions.fail("Duplicate profiling request should throw with 
IllegalStateException.");
+        } catch (Exception e) {
+            Assertions.assertTrue(e.getCause() instanceof 
IllegalStateException);
+        }
+    }
+
+    @Test
+    @Timeout(value = 1, unit = TimeUnit.MINUTES)
+    public void testAllProfilingMode() throws ExecutionException, 
InterruptedException {
+        for (ProfilingInfo.ProfilingMode mode : 
ProfilingInfo.ProfilingMode.values()) {
+            ProfilingInfo profilingInfo =
+                    profilingService.requestProfiling(resourceID, 
profilingDuration, mode).get();
+            if (isNoPermissionOrAllocateSymbol(profilingInfo)) {
+                LOG.warn(
+                        "Ignoring failed profiling instance in {} mode, which 
caused by no permission.",
+                        profilingInfo.getProfilingMode());
+                continue;
+            }
+            Assertions.assertEquals(
+                    ProfilingInfo.ProfilingStatus.RUNNING,
+                    profilingInfo.getStatus(),
+                    String.format(
+                            "Submitting profiling request should be succeed or 
no permission, but got errorMsg=%s",
+                            profilingInfo.getMessage()));
+            waitForProfilingFinished(profilingService);
+            Assertions.assertEquals(
+                    ProfilingInfo.ProfilingStatus.FINISHED,
+                    profilingInfo.getStatus(),
+                    String.format(
+                            "Profiling request should complete successful, but 
got errorMsg=%s",
+                            profilingInfo.getMessage()));
+        }
+
+        verifyRollingDeletion(profilingService);

Review Comment:
   It's better to introduce another unit test to only test the rolling 
deletion. 
   Let one unit test just do one thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to