Myasuka commented on code in PR #23820:
URL: https://github.com/apache/flink/pull/23820#discussion_r1411573550


##########
flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html:
##########
@@ -0,0 +1,108 @@
+<!--
+  ~   Licensed to the Apache Software Foundation (ASF) under one
+  ~   or more contributor license agreements.  See the NOTICE file
+  ~   distributed with this work for additional information
+  ~   regarding copyright ownership.  The ASF licenses this file
+  ~   to you under the Apache License, Version 2.0 (the
+  ~   "License"); you may not use this file except in compliance
+  ~   with the License.  You may obtain a copy of the License at
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~   Unless required by applicable law or agreed to in writing, software
+  ~   distributed under the License is distributed on an "AS IS" BASIS,
+  ~   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~   See the License for the specific language governing permissions and
+  ~   limitations under the License.
+  -->
+
+<nz-card nzSize="small" style="width: 100%; height: 100%">
+  <div style="margin-bottom: 10px">
+    <form nz-form [nzLayout]="'inline'">
+      <nz-form-item>
+        <nz-form-label nzFor="duration">Profiling Duration</nz-form-label>
+        <nz-form-control>
+          <nz-input-number
+            [(ngModel)]="duration"
+            [nzMin]="1"
+            [nzStep]="30"
+            [nzFormatter]="formatterDuration"
+            [nzParser]="parserDuration"
+            nzPlaceHolder="Duration"
+            name="duration"
+          ></nz-input-number>
+        </nz-form-control>
+      </nz-form-item>
+      <nz-form-item>
+        <nz-form-label nzFor="mode">Profiling Mode</nz-form-label>

Review Comment:
   I think we should add a link to the 
[wiki](https://github.com/async-profiler/async-profiler/wiki) of async-profiler 
to tell users what's the meaning of different profiling modes.



##########
flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java:
##########
@@ -295,4 +295,40 @@ public class RestOptions {
                     .defaultValue(Duration.ofMinutes(5))
                     .withDescription(
                             "Maximum duration that the result of an async 
operation is stored. Once elapsed the result of the operation can no longer be 
retrieved.");
+
+    /** Enables the experimental profiler feature. */
+    @Documentation.Section(Documentation.Sections.EXPERT_REST)
+    public static final ConfigOption<Boolean> ENABLE_PROFILER =
+            key("rest.profiling.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Enables the experimental profiler 
feature.");
+
+    /** Maximum history size of profiling list. */
+    @Documentation.Section(Documentation.Sections.EXPERT_REST)
+    public static final ConfigOption<Integer> MAX_PROFILING_HISTORY_SIZE =
+            key("rest.profiling.history-size")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "Maximum profiling history instance to be 
maintained for JobManager or each TaskManager. "
+                                    + "The oldest instance will be removed on 
a rolling basis when the history size exceeds this value.");
+
+    /** Maximum profiling duration for profiling function. */
+    @Documentation.Section(Documentation.Sections.EXPERT_REST)
+    public static final ConfigOption<Integer> MAX_PROFILING_DURATION =
+            key("rest.profiling.duration-max")
+                    .intType()
+                    .defaultValue(300)
+                    .withDescription(
+                            "Maximum profiling duration for each profiling 
request. "

Review Comment:
   We should tell users clearly that the time unit is `second`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ProfilingInfo.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.Serializable;
+
+/** Contains information of a Profiling Instance. */
+public class ProfilingInfo implements ResponseBody, Serializable {
+    private static final long serialVersionUID = 1L;
+    public static final String FIELD_NAME_STATUS = "status";
+    public static final String FIELD_NAME_MODE = "mode";
+    public static final String FIELD_NAME_TRIGGER_TIME = "triggerTime";
+    public static final String FIELD_NAME_FINISHED_TIME = "finishedTime";
+    public static final String FIELD_NAME_DURATION = "duration";
+    public static final String FIELD_NAME_MESSAGE = "message";
+    public static final String FIELD_NAME_OUTPUT_PATH = "file";

Review Comment:
   I think we shall align the name as `outputFile` or make the variable from 
`outputFile` to `file`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ProfilingService.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.messages.ProfilingInfo;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import one.profiler.AsyncProfiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/** Create and keep profiling requests with rolling policy. */
+public class ProfilingService implements Closeable {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(ProfilingService.class);
+    private static ProfilingService instance;
+    private final Map<String, ArrayDeque<ProfilingInfo>> profilingMap;
+    private ScheduledFuture<?> profilingFuture;
+    private final String profilingResultDir;
+    private final int historySize;
+    private final ScheduledExecutorService scheduledExecutor;
+
+    private static final SimpleDateFormat sdf = new 
SimpleDateFormat("yyyy-MM-dd_HH_mm_ss");
+
+    public static ProfilingService getInstance(Configuration configs) {
+        if (instance == null) {
+            instance = new ProfilingService(configs);
+        }
+        return instance;
+    }
+
+    private ProfilingService(Configuration configs) {
+        this.profilingMap = new HashMap<>();
+        this.historySize = 
configs.getInteger(RestOptions.MAX_PROFILING_HISTORY_SIZE);
+        Preconditions.checkArgument(
+                historySize > 0,
+                String.format(
+                        "Configured %s must be positive.",
+                        RestOptions.MAX_PROFILING_HISTORY_SIZE.key()));
+        this.profilingResultDir = 
configs.getString(RestOptions.PROFILING_RESULT_DIR);
+        this.scheduledExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        new ExecutorThreadFactory.Builder()
+                                .setPoolName("flink-profiling-service")
+                                .build());
+    }
+
+    public CompletableFuture<ProfilingInfo> requestProfiling(
+            String resourceID, long duration, ProfilingInfo.ProfilingMode 
mode) {
+        if (profilingFuture != null && !profilingFuture.isDone()) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException(resourceID + " is still under 
profiling."));
+        }
+        ProfilingInfo profilingInfo = ProfilingInfo.create(duration, mode);
+        profilingMap.putIfAbsent(resourceID, new ArrayDeque<>());
+        profilingMap.get(resourceID).addFirst(profilingInfo);
+        AsyncProfiler profiler = AsyncProfiler.getInstance();
+        try {
+            String response =
+                    profiler.execute(
+                            ProfilerConstants.COMMAND_START.msg
+                                    + 
profilingInfo.getProfilingMode().getCode());
+            if (StringUtils.isNullOrWhitespaceOnly(response)
+                    || 
!response.startsWith(ProfilerConstants.PROFILER_STARTED_SUCCESS.msg)) {
+                return CompletableFuture.completedFuture(
+                        profilingInfo.fail("Start profiler failed. " + 
response));
+            }
+        } catch (Exception e) {
+            return CompletableFuture.completedFuture(
+                    profilingInfo.fail("Start profiler failed. " + e));
+        }
+
+        this.profilingFuture =
+                scheduledExecutor.schedule(
+                        () -> stopProfiling(resourceID), duration, 
TimeUnit.SECONDS);
+
+        return CompletableFuture.completedFuture(profilingInfo);
+    }
+
+    public String getProfilingResultDir() {
+        return profilingResultDir;
+    }
+
+    private void stopProfiling(String resourceID) {
+        AsyncProfiler profiler = AsyncProfiler.getInstance();
+        ArrayDeque<ProfilingInfo> profilingList = profilingMap.get(resourceID);
+        Preconditions.checkState(!CollectionUtil.isNullOrEmpty(profilingList));
+        ProfilingInfo info = profilingList.getFirst();
+        try {
+            String fileName = formatOutputFileName(resourceID);
+            String outputPath = new File(profilingResultDir, 
fileName).getPath();
+            String response = 
profiler.execute(ProfilerConstants.COMMAND_STOP.msg + outputPath);
+            if (!StringUtils.isNullOrWhitespaceOnly(response)
+                    && 
response.startsWith(ProfilerConstants.PROFILER_STOPPED_SUCCESS.msg)) {
+                info.success(fileName);
+            } else {
+                info.fail("Stop profiler failed. " + response);
+            }
+            rollingClearing(profilingList);
+        } catch (Throwable e) {
+            info.fail("Stop profiler failed. " + e);
+        }
+    }
+
+    private void rollingClearing(ArrayDeque<ProfilingInfo> profilingList) {
+        while (profilingList.size() > historySize) {
+            ProfilingInfo info = profilingList.pollLast();
+            String outputFile = info.getOutputFile();
+            if (StringUtils.isNullOrWhitespaceOnly(outputFile)) {
+                continue;
+            }
+            try {
+                Files.deleteIfExists(Paths.get(profilingResultDir, 
outputFile));
+            } catch (Exception e) {
+                LOG.error(String.format("Clearing file for %s failed. 
Skipped.", info), e);
+            }
+        }
+    }
+
+    private String formatOutputFileName(String resourceID) {
+        return String.format("%s_%s.html", resourceID, sdf.format(new Date()));

Review Comment:
   I think we shall include the profiler mode as part of the file name for 
better distinguishment.



##########
docs/layouts/shortcodes/generated/rest_configuration.html:
##########
@@ -116,6 +116,30 @@
             <td>Integer</td>
             <td>The port that the client connects to. If rest.bind-port has 
not been specified, then the REST server will bind to this port. Attention: 
This option is respected only if the high-availability configuration is 
NONE.</td>
         </tr>
+        <tr>
+            <td><h5>rest.profiling.dir</h5></td>
+            <td style="word-wrap: 
break-word;">"/var/folders/19/05mc16fs0p13z3sj2y0qkzx00000ks/T/"</td>

Review Comment:
   Same here, the value is strange.



##########
docs/layouts/shortcodes/generated/expert_rest_section.html:
##########
@@ -86,6 +86,30 @@
             <td>Long</td>
             <td>The maximum time in ms for a connection to stay idle before 
failing.</td>
         </tr>
+        <tr>
+            <td><h5>rest.profiling.dir</h5></td>
+            <td style="word-wrap: 
break-word;">"/var/folders/19/05mc16fs0p13z3sj2y0qkzx00000ks/T/"</td>

Review Comment:
   Why this configuration points to a strange dir?



##########
flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html:
##########
@@ -0,0 +1,108 @@
+<!--
+  ~   Licensed to the Apache Software Foundation (ASF) under one
+  ~   or more contributor license agreements.  See the NOTICE file
+  ~   distributed with this work for additional information
+  ~   regarding copyright ownership.  The ASF licenses this file
+  ~   to you under the Apache License, Version 2.0 (the
+  ~   "License"); you may not use this file except in compliance
+  ~   with the License.  You may obtain a copy of the License at
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~   Unless required by applicable law or agreed to in writing, software
+  ~   distributed under the License is distributed on an "AS IS" BASIS,
+  ~   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~   See the License for the specific language governing permissions and
+  ~   limitations under the License.
+  -->
+
+<nz-card nzSize="small" style="width: 100%; height: 100%">
+  <div style="margin-bottom: 10px">
+    <form nz-form [nzLayout]="'inline'">
+      <nz-form-item>
+        <nz-form-label nzFor="duration">Profiling Duration</nz-form-label>
+        <nz-form-control>
+          <nz-input-number
+            [(ngModel)]="duration"
+            [nzMin]="1"
+            [nzStep]="30"
+            [nzFormatter]="formatterDuration"
+            [nzParser]="parserDuration"
+            nzPlaceHolder="Duration"
+            name="duration"
+          ></nz-input-number>
+        </nz-form-control>
+      </nz-form-item>
+      <nz-form-item>
+        <nz-form-label nzFor="mode">Profiling Mode</nz-form-label>

Review Comment:
   Moreover, it's better to give each mode a short description.



##########
flink-runtime-web/web-dashboard/src/app/pages/job-manager/profiler/job-manager-profiler.component.html:
##########
@@ -0,0 +1,108 @@
+<!--
+  ~   Licensed to the Apache Software Foundation (ASF) under one
+  ~   or more contributor license agreements.  See the NOTICE file
+  ~   distributed with this work for additional information
+  ~   regarding copyright ownership.  The ASF licenses this file
+  ~   to you under the Apache License, Version 2.0 (the
+  ~   "License"); you may not use this file except in compliance
+  ~   with the License.  You may obtain a copy of the License at
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~   Unless required by applicable law or agreed to in writing, software
+  ~   distributed under the License is distributed on an "AS IS" BASIS,
+  ~   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~   See the License for the specific language governing permissions and
+  ~   limitations under the License.
+  -->
+
+<nz-card nzSize="small" style="width: 100%; height: 100%">
+  <div style="margin-bottom: 10px">
+    <form nz-form [nzLayout]="'inline'">
+      <nz-form-item>
+        <nz-form-label nzFor="duration">Profiling Duration</nz-form-label>
+        <nz-form-control>
+          <nz-input-number
+            [(ngModel)]="duration"
+            [nzMin]="1"
+            [nzStep]="30"
+            [nzFormatter]="formatterDuration"
+            [nzParser]="parserDuration"
+            nzPlaceHolder="Duration"
+            name="duration"
+          ></nz-input-number>
+        </nz-form-control>
+      </nz-form-item>
+      <nz-form-item>
+        <nz-form-label nzFor="mode">Profiling Mode</nz-form-label>
+        <nz-form-control>
+          <nz-select
+            [(ngModel)]="selectMode"
+            nzPlaceHolder="Profiling Mode"
+            name="mode"
+            style="width: 200px"
+          >
+            <nz-option nzValue="CPU" nzLabel="CPU"></nz-option>
+            <nz-option nzValue="LOCK" nzLabel="Lock"></nz-option>
+            <nz-option nzValue="WALL" nzLabel="Wall-Clock"></nz-option>
+            <nz-option nzValue="ALLOC" nzLabel="Allocation"></nz-option>
+            <nz-option nzValue="ITIMER" nzLabel="ITIMER"></nz-option>
+          </nz-select>
+        </nz-form-control>
+      </nz-form-item>
+      <nz-form-item>
+        <nz-form-control>
+          <button
+            nz-button
+            nzType="primary"
+            [nzLoading]="isCreating"
+            [disabled]="duration === null || !isEnabled"
+            (click)="createProfilingInstance()"
+            style="margin-left: 10px"
+          >
+            Create Profiling Instance
+          </button>
+        </nz-form-control>
+      </nz-form-item>
+    </form>
+    <nz-alert
+      *ngIf="!isEnabled"
+      nzType="warning"
+      style="margin-top: 10px"
+      nzShowIcon
+      nzMessage="You need to set the config `rest.profiling.enabled=true` to 
enable this experimental profiler feature."
+    ></nz-alert>
+  </div>
+  <nz-table
+    [nzSize]="'small'"
+    [nzData]="profilingList"
+    [nzLoading]="isLoading"
+    [nzFrontPagination]="false"
+    [nzShowPagination]="false"
+  >
+    <thead>
+      <tr>
+        <th nzWidth="5%">Index</th>
+        <th nzWidth="15%">Trigger Time</th>
+        <th nzWidth="15%">Finished Time</th>
+        <th nzWidth="10%">Profiling Duration</th>
+        <th nzWidth="5%">Mode</th>
+        <th nzWidth="10%">Status</th>
+        <th nzWidth="15%">Message</th>
+        <th nzWidth="25%">Link</th>
+      </tr>
+    </thead>
+    <tbody>
+      <tr *ngFor="let info of profilingList; let id = index">
+        <td>{{ id }}</td>
+        <td>{{ info.triggerTime | humanizeWatermarkToDatetime }}</td>
+        <td>{{ info.finishedTime | humanizeWatermarkToDatetime }}</td>
+        <td>{{ info.duration }} s</td>
+        <td>{{ info.mode }}</td>
+        <td>{{ info.status }}</td>
+        <td>{{ info.message }}</td>
+        <td>
+          <a (click)="downloadProfilingResult(info.file)">{{ info.file }}</a>

Review Comment:
   Since we could get the result file in most cases, I prefer to exchange the 
position of `Message` and `Link`:
   <img width="1433" alt="image" 
src="https://github.com/apache/flink/assets/1709104/59364343-afd5-4ab5-8047-346c81c91daa";>
   
   Moreover, if the link could be generated, we shall fill in some contents in 
the message field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to