Myasuka commented on code in PR #23820: URL: https://github.com/apache/flink/pull/23820#discussion_r1423532806
########## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { Review Comment: You can extend this class from `TestLogger`, so that you could have a `log` to print logs. ########## flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import one.profiler.AsyncProfiler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** Create and keep profiling requests with rolling policy. */ +public class ProfilingService implements Closeable { + + protected static final Logger LOG = LoggerFactory.getLogger(ProfilingService.class); + private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH_mm_ss"); + private static ProfilingService instance; + private final Map<String, ArrayDeque<ProfilingInfo>> profilingMap; + private final String profilingResultDir; + private final int historySizeLimit; + private final ScheduledExecutorService scheduledExecutor; + private ProfilingFuture profilingFuture; + + private ProfilingService(Configuration configs) { + this.profilingMap = new HashMap<>(); + this.historySizeLimit = configs.getInteger(RestOptions.MAX_PROFILING_HISTORY_SIZE); + Preconditions.checkArgument( + historySizeLimit > 0, + String.format( + "Configured %s must be positive.", + RestOptions.MAX_PROFILING_HISTORY_SIZE.key())); + this.profilingResultDir = configs.getString(RestOptions.PROFILING_RESULT_DIR); + this.scheduledExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory.Builder() + .setPoolName("flink-profiling-service") + .build()); + } + + public static ProfilingService getInstance(Configuration configs) { + if (instance == null) { + instance = new ProfilingService(configs); + } + return instance; + } + + public CompletableFuture<ProfilingInfo> requestProfiling( + String resourceID, long duration, ProfilingInfo.ProfilingMode mode) { + if (profilingFuture != null && !profilingFuture.isDone()) { + return FutureUtils.completedExceptionally( + new IllegalStateException(resourceID + " is still under profiling.")); + } + ProfilingInfo profilingInfo = ProfilingInfo.create(duration, mode); + profilingMap.putIfAbsent(resourceID, new ArrayDeque<>()); + profilingMap.get(resourceID).addFirst(profilingInfo); + AsyncProfiler profiler = AsyncProfiler.getInstance(); + try { + String response = + profiler.execute( + ProfilerConstants.COMMAND_START.msg + + profilingInfo.getProfilingMode().getCode()); + if (StringUtils.isNullOrWhitespaceOnly(response) + || !response.startsWith(ProfilerConstants.PROFILER_STARTED_SUCCESS.msg)) { + return CompletableFuture.completedFuture( + profilingInfo.fail("Start profiler failed. " + response)); + } + } catch (Exception e) { + return CompletableFuture.completedFuture( + profilingInfo.fail("Start profiler failed. " + e)); + } + + scheduledExecutor.schedule(() -> stopProfiling(resourceID), duration, TimeUnit.SECONDS); + + this.profilingFuture = new ProfilingFuture(duration, () -> stopProfiling(resourceID)); + return CompletableFuture.completedFuture(profilingInfo); + } + + private void stopProfiling(String resourceID) { + AsyncProfiler profiler = AsyncProfiler.getInstance(); + ArrayDeque<ProfilingInfo> profilingList = profilingMap.get(resourceID); + Preconditions.checkState(!CollectionUtil.isNullOrEmpty(profilingList)); + ProfilingInfo info = profilingList.getFirst(); + try { + String fileName = formatOutputFileName(resourceID, info); + String outputPath = new File(profilingResultDir, fileName).getPath(); + String response = profiler.execute(ProfilerConstants.COMMAND_STOP.msg + outputPath); + if (!StringUtils.isNullOrWhitespaceOnly(response) + && response.startsWith(ProfilerConstants.PROFILER_STOPPED_SUCCESS.msg)) { + info.success(fileName); + } else { + info.fail("Stop profiler failed. " + response); + } + rollingClearing(profilingList); + } catch (Throwable e) { + info.fail("Stop profiler failed. " + e); + } + } + + private void rollingClearing(ArrayDeque<ProfilingInfo> profilingList) { + while (profilingList.size() > historySizeLimit) { + ProfilingInfo info = profilingList.pollLast(); + String outputFile = info != null ? info.getOutputFile() : ""; + if (StringUtils.isNullOrWhitespaceOnly(outputFile)) { + continue; + } + try { + Files.deleteIfExists(Paths.get(profilingResultDir, outputFile)); + } catch (Exception e) { + LOG.error(String.format("Clearing file for %s failed. Skipped.", info), e); + } + } + } + + private String formatOutputFileName(String resourceID, ProfilingInfo info) { + return String.format( + "%s_%s_%s.html", resourceID, info.getProfilingMode(), sdf.format(new Date())); + } + + @Override + public void close() throws IOException { + try { + if (profilingFuture != null && !profilingFuture.isDone()) { + profilingFuture.cancel(); + } + scheduledExecutor.shutdownNow(); Review Comment: Can we add some checks here? ~~~java if (!scheduledExecutor.isShutdown()) { scheduledExecutor.shutdownNow(); } ~~~ ########## flink-runtime/src/main/java/org/apache/flink/runtime/util/profiler/ProfilingService.java: ########## @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; + +import one.profiler.AsyncProfiler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** Create and keep profiling requests with rolling policy. */ +public class ProfilingService implements Closeable { + + protected static final Logger LOG = LoggerFactory.getLogger(ProfilingService.class); + private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd_HH_mm_ss"); + private static ProfilingService instance; + private final Map<String, ArrayDeque<ProfilingInfo>> profilingMap; + private final String profilingResultDir; + private final int historySizeLimit; + private final ScheduledExecutorService scheduledExecutor; + private ProfilingFuture profilingFuture; + + private ProfilingService(Configuration configs) { + this.profilingMap = new HashMap<>(); + this.historySizeLimit = configs.getInteger(RestOptions.MAX_PROFILING_HISTORY_SIZE); + Preconditions.checkArgument( + historySizeLimit > 0, + String.format( + "Configured %s must be positive.", + RestOptions.MAX_PROFILING_HISTORY_SIZE.key())); + this.profilingResultDir = configs.getString(RestOptions.PROFILING_RESULT_DIR); + this.scheduledExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory.Builder() + .setPoolName("flink-profiling-service") + .build()); + } + + public static ProfilingService getInstance(Configuration configs) { + if (instance == null) { + instance = new ProfilingService(configs); + } + return instance; + } Review Comment: I think this getter is not a singleton one, can we make it a singleton one? ########## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ProfilingServiceTest.class); + private static final Configuration configs = new Configuration(); + private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf events."; + private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer symbols found."; + private static final String resourceID = "TestJobManager"; + private static final long profilingDuration = 3L; + private static final int historySizeLimit = 3; + + private ProfilingService profilingService; + + @BeforeAll + static void beforeAll() { + configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit); + } + + @BeforeEach + void setUp(@TempDir Path tempDir) { + configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); + profilingService = ProfilingService.getInstance(configs); + verifyConfigsWorks(profilingService, tempDir); + } + + @AfterEach + void tearDown() throws IOException { + profilingService.close(); + } + + @Test + public void testSingleInstance() throws IOException { + ProfilingService instance = ProfilingService.getInstance(configs); + Assertions.assertEquals(profilingService, instance); + instance.close(); + } + + @Test + void testFailedRequestUnderProfiling() throws ExecutionException, InterruptedException { + ProfilingInfo profilingInfo = + profilingService + .requestProfiling(resourceID, 10, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, profilingInfo.getStatus()); + try { + profilingService + .requestProfiling( + resourceID, profilingDuration, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.fail("Duplicate profiling request should throw with IllegalStateException."); + } catch (Exception e) { + Assertions.assertTrue(e.getCause() instanceof IllegalStateException); + } + } + + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testAllProfilingMode() throws ExecutionException, InterruptedException { + for (ProfilingInfo.ProfilingMode mode : ProfilingInfo.ProfilingMode.values()) { + ProfilingInfo profilingInfo = + profilingService.requestProfiling(resourceID, profilingDuration, mode).get(); + if (isNoPermissionOrAllocateSymbol(profilingInfo)) { + LOG.warn( + "Ignoring failed profiling instance in {} mode, which caused by no permission.", Review Comment: I think there exist more messages other than `no permission` here. ########## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ProfilingServiceTest.class); + private static final Configuration configs = new Configuration(); + private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf events."; + private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer symbols found."; + private static final String resourceID = "TestJobManager"; + private static final long profilingDuration = 3L; + private static final int historySizeLimit = 3; + + private ProfilingService profilingService; + + @BeforeAll + static void beforeAll() { + configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit); + } + + @BeforeEach + void setUp(@TempDir Path tempDir) { + configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); + profilingService = ProfilingService.getInstance(configs); + verifyConfigsWorks(profilingService, tempDir); + } + + @AfterEach + void tearDown() throws IOException { + profilingService.close(); + } + + @Test + public void testSingleInstance() throws IOException { + ProfilingService instance = ProfilingService.getInstance(configs); + Assertions.assertEquals(profilingService, instance); + instance.close(); + } + + @Test + void testFailedRequestUnderProfiling() throws ExecutionException, InterruptedException { + ProfilingInfo profilingInfo = + profilingService + .requestProfiling(resourceID, 10, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, profilingInfo.getStatus()); + try { + profilingService + .requestProfiling( + resourceID, profilingDuration, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.fail("Duplicate profiling request should throw with IllegalStateException."); + } catch (Exception e) { + Assertions.assertTrue(e.getCause() instanceof IllegalStateException); + } + } + + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testAllProfilingMode() throws ExecutionException, InterruptedException { + for (ProfilingInfo.ProfilingMode mode : ProfilingInfo.ProfilingMode.values()) { + ProfilingInfo profilingInfo = + profilingService.requestProfiling(resourceID, profilingDuration, mode).get(); + if (isNoPermissionOrAllocateSymbol(profilingInfo)) { + LOG.warn( + "Ignoring failed profiling instance in {} mode, which caused by no permission.", + profilingInfo.getProfilingMode()); + continue; + } + Assertions.assertEquals( + ProfilingInfo.ProfilingStatus.RUNNING, + profilingInfo.getStatus(), + String.format( + "Submitting profiling request should be succeed or no permission, but got errorMsg=%s", + profilingInfo.getMessage())); + waitForProfilingFinished(profilingService); + Assertions.assertEquals( + ProfilingInfo.ProfilingStatus.FINISHED, + profilingInfo.getStatus(), + String.format( + "Profiling request should complete successful, but got errorMsg=%s", + profilingInfo.getMessage())); + } + + verifyRollingDeletion(profilingService); + } + + private void verifyConfigsWorks(ProfilingService profilingService, Path configuredDir) { + Assertions.assertEquals(configuredDir.toString(), profilingService.getProfilingResultDir()); + Assertions.assertEquals(historySizeLimit, profilingService.getHistorySizeLimit()); + } + + private void verifyRollingDeletion(ProfilingService profilingService) { + ArrayDeque<ProfilingInfo> profilingList = + profilingService.getProfilingListForTest(resourceID); + // Profiling History shouldn't exceed history size limit. + Assertions.assertEquals(historySizeLimit, profilingList.size()); + // Profiling History files should be rolling deleted. + Set<String> resultFileNames = new HashSet<>(); + File configuredDir = new File(profilingService.getProfilingResultDir()); + for (File f : Objects.requireNonNull(configuredDir.listFiles())) { + resultFileNames.add(f.getName()); + } + Assertions.assertEquals(profilingList.size(), resultFileNames.size()); + for (ProfilingInfo profilingInfo : profilingList) { + String outputFile = profilingInfo.getOutputFile(); + Assertions.assertTrue(resultFileNames.contains(outputFile)); + } + } + + private void waitForProfilingFinished(ProfilingService profilingService) { + while (!profilingService.getProfilingFuture().isDone()) {} Review Comment: It's better to add a `Thread#sleep` here. ########## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ProfilingServiceTest.class); + private static final Configuration configs = new Configuration(); + private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf events."; + private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer symbols found."; + private static final String resourceID = "TestJobManager"; + private static final long profilingDuration = 3L; + private static final int historySizeLimit = 3; + + private ProfilingService profilingService; + + @BeforeAll + static void beforeAll() { + configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit); + } + + @BeforeEach + void setUp(@TempDir Path tempDir) { + configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); + profilingService = ProfilingService.getInstance(configs); + verifyConfigsWorks(profilingService, tempDir); Review Comment: I don't think we need to verify the configs within the `setup`. ########## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ProfilingServiceTest.class); + private static final Configuration configs = new Configuration(); + private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf events."; + private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer symbols found."; + private static final String resourceID = "TestJobManager"; + private static final long profilingDuration = 3L; + private static final int historySizeLimit = 3; + + private ProfilingService profilingService; + + @BeforeAll + static void beforeAll() { + configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit); + } + + @BeforeEach + void setUp(@TempDir Path tempDir) { + configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); + profilingService = ProfilingService.getInstance(configs); + verifyConfigsWorks(profilingService, tempDir); + } + + @AfterEach + void tearDown() throws IOException { + profilingService.close(); + } + + @Test + public void testSingleInstance() throws IOException { + ProfilingService instance = ProfilingService.getInstance(configs); + Assertions.assertEquals(profilingService, instance); + instance.close(); + } + + @Test + void testFailedRequestUnderProfiling() throws ExecutionException, InterruptedException { + ProfilingInfo profilingInfo = + profilingService + .requestProfiling(resourceID, 10, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, profilingInfo.getStatus()); + try { + profilingService + .requestProfiling( + resourceID, profilingDuration, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.fail("Duplicate profiling request should throw with IllegalStateException."); + } catch (Exception e) { + Assertions.assertTrue(e.getCause() instanceof IllegalStateException); + } + } + + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testAllProfilingMode() throws ExecutionException, InterruptedException { + for (ProfilingInfo.ProfilingMode mode : ProfilingInfo.ProfilingMode.values()) { + ProfilingInfo profilingInfo = + profilingService.requestProfiling(resourceID, profilingDuration, mode).get(); + if (isNoPermissionOrAllocateSymbol(profilingInfo)) { + LOG.warn( + "Ignoring failed profiling instance in {} mode, which caused by no permission.", + profilingInfo.getProfilingMode()); + continue; + } + Assertions.assertEquals( + ProfilingInfo.ProfilingStatus.RUNNING, + profilingInfo.getStatus(), + String.format( + "Submitting profiling request should be succeed or no permission, but got errorMsg=%s", + profilingInfo.getMessage())); + waitForProfilingFinished(profilingService); + Assertions.assertEquals( + ProfilingInfo.ProfilingStatus.FINISHED, + profilingInfo.getStatus(), + String.format( + "Profiling request should complete successful, but got errorMsg=%s", + profilingInfo.getMessage())); + } + + verifyRollingDeletion(profilingService); + } + + private void verifyConfigsWorks(ProfilingService profilingService, Path configuredDir) { + Assertions.assertEquals(configuredDir.toString(), profilingService.getProfilingResultDir()); + Assertions.assertEquals(historySizeLimit, profilingService.getHistorySizeLimit()); + } + + private void verifyRollingDeletion(ProfilingService profilingService) { + ArrayDeque<ProfilingInfo> profilingList = + profilingService.getProfilingListForTest(resourceID); + // Profiling History shouldn't exceed history size limit. + Assertions.assertEquals(historySizeLimit, profilingList.size()); + // Profiling History files should be rolling deleted. + Set<String> resultFileNames = new HashSet<>(); + File configuredDir = new File(profilingService.getProfilingResultDir()); + for (File f : Objects.requireNonNull(configuredDir.listFiles())) { + resultFileNames.add(f.getName()); + } + Assertions.assertEquals(profilingList.size(), resultFileNames.size()); + for (ProfilingInfo profilingInfo : profilingList) { + String outputFile = profilingInfo.getOutputFile(); + Assertions.assertTrue(resultFileNames.contains(outputFile)); + } + } + + private void waitForProfilingFinished(ProfilingService profilingService) { + while (!profilingService.getProfilingFuture().isDone()) {} + } + + /** + * Check profiling instance failed caused by no permission to perf_events or missing of JDK + * debug symbols. + * + * @return true if no permission to access perf_events. Review Comment: ```suggestion * @return true if no permission to access perf_events or no AllocTracer symbols found. ``` ########## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ProfilingServiceTest.class); + private static final Configuration configs = new Configuration(); + private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf events."; + private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer symbols found."; + private static final String resourceID = "TestJobManager"; + private static final long profilingDuration = 3L; + private static final int historySizeLimit = 3; + + private ProfilingService profilingService; + + @BeforeAll + static void beforeAll() { + configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit); + } + + @BeforeEach + void setUp(@TempDir Path tempDir) { + configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); + profilingService = ProfilingService.getInstance(configs); + verifyConfigsWorks(profilingService, tempDir); + } + + @AfterEach + void tearDown() throws IOException { + profilingService.close(); + } + + @Test + public void testSingleInstance() throws IOException { + ProfilingService instance = ProfilingService.getInstance(configs); + Assertions.assertEquals(profilingService, instance); + instance.close(); + } + + @Test + void testFailedRequestUnderProfiling() throws ExecutionException, InterruptedException { + ProfilingInfo profilingInfo = + profilingService + .requestProfiling(resourceID, 10, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, profilingInfo.getStatus()); + try { + profilingService + .requestProfiling( + resourceID, profilingDuration, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.fail("Duplicate profiling request should throw with IllegalStateException."); + } catch (Exception e) { + Assertions.assertTrue(e.getCause() instanceof IllegalStateException); + } + } + + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testAllProfilingMode() throws ExecutionException, InterruptedException { + for (ProfilingInfo.ProfilingMode mode : ProfilingInfo.ProfilingMode.values()) { + ProfilingInfo profilingInfo = + profilingService.requestProfiling(resourceID, profilingDuration, mode).get(); + if (isNoPermissionOrAllocateSymbol(profilingInfo)) { + LOG.warn( + "Ignoring failed profiling instance in {} mode, which caused by no permission.", + profilingInfo.getProfilingMode()); + continue; + } + Assertions.assertEquals( + ProfilingInfo.ProfilingStatus.RUNNING, + profilingInfo.getStatus(), + String.format( + "Submitting profiling request should be succeed or no permission, but got errorMsg=%s", + profilingInfo.getMessage())); + waitForProfilingFinished(profilingService); + Assertions.assertEquals( + ProfilingInfo.ProfilingStatus.FINISHED, + profilingInfo.getStatus(), + String.format( + "Profiling request should complete successful, but got errorMsg=%s", + profilingInfo.getMessage())); + } + + verifyRollingDeletion(profilingService); + } + + private void verifyConfigsWorks(ProfilingService profilingService, Path configuredDir) { + Assertions.assertEquals(configuredDir.toString(), profilingService.getProfilingResultDir()); + Assertions.assertEquals(historySizeLimit, profilingService.getHistorySizeLimit()); + } + + private void verifyRollingDeletion(ProfilingService profilingService) { + ArrayDeque<ProfilingInfo> profilingList = + profilingService.getProfilingListForTest(resourceID); + // Profiling History shouldn't exceed history size limit. + Assertions.assertEquals(historySizeLimit, profilingList.size()); Review Comment: Can we always assert this if some tests cannot be taken due to lacking permission. How about `Assertions.assertTrue(profilingList.size() <= profilingService.getHistorySizeLimit());`? ########## flink-dist/src/main/resources/META-INF/NOTICE: ########## @@ -20,6 +20,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.lz4:lz4-java:1.8.0 - org.objenesis:objenesis:2.1 - org.xerial.snappy:snappy-java:1.1.10.4 +- tools.profiler:async-profiler:2.9 Review Comment: This change should be included in the 1st commit. ########## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ProfilingServiceTest.class); + private static final Configuration configs = new Configuration(); + private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf events."; + private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer symbols found."; + private static final String resourceID = "TestJobManager"; + private static final long profilingDuration = 3L; + private static final int historySizeLimit = 3; + + private ProfilingService profilingService; + + @BeforeAll + static void beforeAll() { + configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit); + } + + @BeforeEach + void setUp(@TempDir Path tempDir) { + configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); + profilingService = ProfilingService.getInstance(configs); + verifyConfigsWorks(profilingService, tempDir); + } + + @AfterEach + void tearDown() throws IOException { + profilingService.close(); + } + + @Test + public void testSingleInstance() throws IOException { + ProfilingService instance = ProfilingService.getInstance(configs); + Assertions.assertEquals(profilingService, instance); + instance.close(); Review Comment: It's better to add a `finally` here. ########## flink-runtime-web/web-dashboard/src/app/components/humanize-watermark.pipe.ts: ########## @@ -44,8 +44,8 @@ export class HumanizeWatermarkToDatetimePipe implements PipeTransform { constructor(private readonly configService: ConfigService) {} public transform(value: number): number | string { - if (isNaN(value) || value <= this.configService.LONG_MIN_VALUE) { - return '-'; + if (value == null || isNaN(value) || value <= this.configService.LONG_MIN_VALUE) { + return 'N/A'; Review Comment: Why we introduce these changes in this PR? ########## flink-runtime/src/test/java/org/apache/flink/runtime/util/profiler/ProfilingServiceTest.java: ########## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util.profiler; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.runtime.rest.messages.ProfilingInfo; +import org.apache.flink.util.StringUtils; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** Unit tests for {@link ProfilingService}. */ +public class ProfilingServiceTest { + + private static final Logger LOG = LoggerFactory.getLogger(ProfilingServiceTest.class); + private static final Configuration configs = new Configuration(); + private static final String NO_ACCESS_TO_PERF_EVENTS = "No access to perf events."; + private static final String NO_ALLOC_SYMBOL_FOUND = "No AllocTracer symbols found."; + private static final String resourceID = "TestJobManager"; + private static final long profilingDuration = 3L; + private static final int historySizeLimit = 3; + + private ProfilingService profilingService; + + @BeforeAll + static void beforeAll() { + configs.set(RestOptions.MAX_PROFILING_HISTORY_SIZE, historySizeLimit); + } + + @BeforeEach + void setUp(@TempDir Path tempDir) { + configs.set(RestOptions.PROFILING_RESULT_DIR, tempDir.toString()); + profilingService = ProfilingService.getInstance(configs); + verifyConfigsWorks(profilingService, tempDir); + } + + @AfterEach + void tearDown() throws IOException { + profilingService.close(); + } + + @Test + public void testSingleInstance() throws IOException { + ProfilingService instance = ProfilingService.getInstance(configs); + Assertions.assertEquals(profilingService, instance); + instance.close(); + } + + @Test + void testFailedRequestUnderProfiling() throws ExecutionException, InterruptedException { + ProfilingInfo profilingInfo = + profilingService + .requestProfiling(resourceID, 10, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.assertEquals(ProfilingInfo.ProfilingStatus.RUNNING, profilingInfo.getStatus()); + try { + profilingService + .requestProfiling( + resourceID, profilingDuration, ProfilingInfo.ProfilingMode.ITIMER) + .get(); + Assertions.fail("Duplicate profiling request should throw with IllegalStateException."); + } catch (Exception e) { + Assertions.assertTrue(e.getCause() instanceof IllegalStateException); + } + } + + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testAllProfilingMode() throws ExecutionException, InterruptedException { + for (ProfilingInfo.ProfilingMode mode : ProfilingInfo.ProfilingMode.values()) { + ProfilingInfo profilingInfo = + profilingService.requestProfiling(resourceID, profilingDuration, mode).get(); + if (isNoPermissionOrAllocateSymbol(profilingInfo)) { + LOG.warn( + "Ignoring failed profiling instance in {} mode, which caused by no permission.", + profilingInfo.getProfilingMode()); + continue; + } + Assertions.assertEquals( + ProfilingInfo.ProfilingStatus.RUNNING, + profilingInfo.getStatus(), + String.format( + "Submitting profiling request should be succeed or no permission, but got errorMsg=%s", + profilingInfo.getMessage())); + waitForProfilingFinished(profilingService); + Assertions.assertEquals( + ProfilingInfo.ProfilingStatus.FINISHED, + profilingInfo.getStatus(), + String.format( + "Profiling request should complete successful, but got errorMsg=%s", + profilingInfo.getMessage())); + } + + verifyRollingDeletion(profilingService); Review Comment: It's better to introduce another unit test to only test the rolling deletion. Let one unit test just do one thing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org