Myasuka commented on code in PR #24041:
URL: https://github.com/apache/flink/pull/24041#discussion_r1451675133


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java:
##########
@@ -225,17 +225,34 @@ CompletableFuture<TransientBlobKey> 
requestTaskManagerFileUploadByType(
 
     /**
      * Request the file upload from the given {@link TaskExecutor} to the 
cluster's {@link
-     * BlobServer}. The corresponding {@link TransientBlobKey} is returned.
+     * BlobServer}. The corresponding {@link TransientBlobKey} is returned. To 
support different
+     * type file upload with name, using {@link
+     * ResourceManager#requestTaskManagerFileUploadByNameAndType} as instead.
      *
      * @param taskManagerId identifying the {@link TaskExecutor} to upload the 
specified file
      * @param fileName name of the file to upload
      * @param timeout for the asynchronous operation
      * @return Future which is completed with the {@link TransientBlobKey} 
after uploading the file
      *     to the {@link BlobServer}.
      */
+    @Deprecated
     CompletableFuture<TransientBlobKey> requestTaskManagerFileUploadByName(
             ResourceID taskManagerId, String fileName, @RpcTimeout Time 
timeout);
 
+    /**
+     * Request the file upload from the given {@link TaskExecutor} to the 
cluster's {@link
+     * BlobServer}. The corresponding {@link TransientBlobKey} is returned.
+     *
+     * @param taskManagerId identifying the {@link TaskExecutor} to upload the 
specified file
+     * @param fileName name of the file to upload
+     * @param fileType type of the file to upload
+     * @param timeout for the asynchronous operation
+     * @return Future which is completed with the {@link TransientBlobKey} 
after uploading the file
+     *     to the {@link BlobServer}.
+     */
+    CompletableFuture<TransientBlobKey> 
requestTaskManagerFileUploadByNameAndType(
+            ResourceID taskManagerId, String fileName, FileType fileType, 
@RpcTimeout Time timeout);

Review Comment:
   Same here, I prefer to use `Duration` instead of `Time` here, just as 
[FLINK-14819](https://issues.apache.org/jira/browse/FLINK-14819) did.



##########
flink-runtime-web/web-dashboard/src/app/pages/task-manager/profiler/task-manager-profiler.component.html:
##########
@@ -0,0 +1,91 @@
+<!--
+  ~   Licensed to the Apache Software Foundation (ASF) under one
+  ~   or more contributor license agreements.  See the NOTICE file
+  ~   distributed with this work for additional information
+  ~   regarding copyright ownership.  The ASF licenses this file
+  ~   to you under the Apache License, Version 2.0 (the
+  ~   "License"); you may not use this file except in compliance
+  ~   with the License.  You may obtain a copy of the License at
+  ~       http://www.apache.org/licenses/LICENSE-2.0
+  ~   Unless required by applicable law or agreed to in writing, software
+  ~   distributed under the License is distributed on an "AS IS" BASIS,
+  ~   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~   See the License for the specific language governing permissions and
+  ~   limitations under the License.
+  -->
+
+<nz-card nzSize="small" style="width: 100%; height: 100%">
+  <div style="margin-bottom: 10px">
+    <form nz-form [nzLayout]="'inline'">
+      <nz-form-item>
+        <nz-form-label nzFor="duration">Profiling Duration</nz-form-label>
+        <nz-form-control>
+          <nz-input-number
+            [(ngModel)]="duration"
+            [nzMin]="1"
+            [nzStep]="30"
+            [nzFormatter]="formatterDuration"
+            [nzParser]="parserDuration"
+            nzPlaceHolder="Duration"
+            name="duration"
+          ></nz-input-number>
+        </nz-form-control>
+      </nz-form-item>
+      <nz-form-item>
+        <nz-form-control>
+          <button
+            nz-button
+            nzType="primary"
+            [nzLoading]="isCreating"
+            [disabled]="duration === null || !isEnabled"
+            (click)="createProfilingInstance()"
+            style="margin-left: 10px"
+          >
+            Create Profiling Instance
+          </button>
+        </nz-form-control>
+      </nz-form-item>
+    </form>
+    <nz-alert
+      *ngIf="!isEnabled"
+      nzType="warning"
+      style="margin-top: 10px"
+      nzShowIcon
+      nzMessage="You need to set the config `rest.profiler.enabled=true` to 
enable this experimental profiler feature."

Review Comment:
   The hint is correct with current implementation, and we shall use yaml-like 
hint `rest.profiling.enabled: true`.
   
   The `job-manager-profiler.component.html` should also be changed.
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java:
##########
@@ -282,4 +284,30 @@ CompletableFuture<ThreadDumpInfo> requestThreadDump(
      */
     CompletableFuture<TaskExecutorThreadInfoGateway> 
requestTaskExecutorThreadInfoGateway(
             ResourceID taskManagerId, @RpcTimeout Time timeout);
+
+    /**
+     * Request profiling list from the given {@link TaskExecutor}.
+     *
+     * @param taskManagerId identifying the {@link TaskExecutor} to get 
profiling list from
+     * @param timeout for the asynchronous operation
+     * @return Future which is completed with the historical profiling list
+     */
+    CompletableFuture<Collection<ProfilingInfo>> 
requestTaskManagerProfilingList(
+            ResourceID taskManagerId, @RpcTimeout Time timeout);
+
+    /**
+     * Requests the profiling instance from the given {@link TaskExecutor}.
+     *
+     * @param taskManagerId taskManagerId identifying the {@link TaskExecutor} 
to get the profiling
+     *     from
+     * @param duration profiling duration
+     * @param mode profiling mode {@link ProfilingMode}
+     * @param timeout timeout of the asynchronous operation
+     * @return Future containing the created profiling information
+     */
+    CompletableFuture<ProfilingInfo> requestProfiling(
+            ResourceID taskManagerId,
+            int duration,
+            ProfilingInfo.ProfilingMode mode,
+            @RpcTimeout Time timeout);

Review Comment:
   Same suggestion here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java:
##########
@@ -257,6 +257,18 @@ CompletableFuture<TransientBlobKey> 
requestFileUploadByType(
     CompletableFuture<TransientBlobKey> requestFileUploadByName(
             String fileName, @RpcTimeout Time timeout);
 
+    /**
+     * Requests the file upload of the specified name and file type to the 
cluster's {@link
+     * BlobServer}.
+     *
+     * @param fileName to upload
+     * @param fileType to upload
+     * @param timeout for the asynchronous operation
+     * @return Future which is completed with the {@link TransientBlobKey} of 
the uploaded file.
+     */
+    CompletableFuture<TransientBlobKey> requestFileUploadByNameAndType(
+            String fileName, FileType fileType, @RpcTimeout Time timeout);

Review Comment:
   For newly added interface, I prefer to use `Duration` instead of `Time`, 
just as FLINK-14819 did.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java:
##########
@@ -282,4 +284,30 @@ CompletableFuture<ThreadDumpInfo> requestThreadDump(
      */
     CompletableFuture<TaskExecutorThreadInfoGateway> 
requestTaskExecutorThreadInfoGateway(
             ResourceID taskManagerId, @RpcTimeout Time timeout);
+
+    /**
+     * Request profiling list from the given {@link TaskExecutor}.
+     *
+     * @param taskManagerId identifying the {@link TaskExecutor} to get 
profiling list from
+     * @param timeout for the asynchronous operation
+     * @return Future which is completed with the historical profiling list
+     */
+    CompletableFuture<Collection<ProfilingInfo>> 
requestTaskManagerProfilingList(
+            ResourceID taskManagerId, @RpcTimeout Time timeout);

Review Comment:
   Please use `Duration` for newly added interfaces.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerProfilingFileHandler.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ProfilingFileNamePathParameter;
+import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerProfilingFileMessageParameters;
+import org.apache.flink.runtime.taskexecutor.FileType;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Rest handler which serves the profiling result file of the {@link 
TaskExecutor}. */
+public class TaskManagerProfilingFileHandler

Review Comment:
   Please use `Duration` class in `TaskManagerProfilingHandler`, 
`TaskManagerProfilingListHandler` and `TaskManagerProfilingFileHandler` if 
possible.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java:
##########
@@ -225,17 +225,34 @@ CompletableFuture<TransientBlobKey> 
requestTaskManagerFileUploadByType(
 
     /**
      * Request the file upload from the given {@link TaskExecutor} to the 
cluster's {@link
-     * BlobServer}. The corresponding {@link TransientBlobKey} is returned.
+     * BlobServer}. The corresponding {@link TransientBlobKey} is returned. To 
support different
+     * type file upload with name, using {@link
+     * ResourceManager#requestTaskManagerFileUploadByNameAndType} as instead.
      *
      * @param taskManagerId identifying the {@link TaskExecutor} to upload the 
specified file
      * @param fileName name of the file to upload
      * @param timeout for the asynchronous operation
      * @return Future which is completed with the {@link TransientBlobKey} 
after uploading the file

Review Comment:
   If we decide to mark this interface as `Deprecated`, I think we can add 
description like 
   ~~~java
   @deprecated use {@link 
#requestTaskManagerFileUploadByNameAndType(ResourceID, String, FileType, 
Duration)} instead
   ~~~



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java:
##########
@@ -313,4 +314,22 @@ CompletableFuture<Acknowledge> sendOperatorEventToTask(
      */
     CompletableFuture<Acknowledge> updateDelegationTokens(
             ResourceManagerId resourceManagerId, byte[] tokens);
+
+    /**
+     * Requests the profiling from this TaskManager.
+     *
+     * @param duration profiling duration
+     * @param mode profiling mode {@link ProfilingInfo.ProfilingMode}
+     * @param timeout timeout for the asynchronous operation
+     * @return the {@link ProfilingInfo} for this TaskManager.
+     */
+    CompletableFuture<ProfilingInfo> requestProfiling(
+            int duration, ProfilingInfo.ProfilingMode mode, @RpcTimeout Time 
timeout);

Review Comment:
   Please also change the class to `Duration`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to