This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 90f3c4d  Sniffer processing profile task and report status and 
snapshot (#4220)
90f3c4d is described below

commit 90f3c4de557b427426dc33a86f3223be6a831794
Author: mrproliu <741550...@qq.com>
AuthorDate: Sat Jan 18 18:38:03 2020 +0800

    Sniffer processing profile task and report status and snapshot (#4220)
    
    * sniffer processing profile task and report status and snapshot
    
    * resolve testServiceDependencies test case error, use same register with 
`TraceSegmentServiceClient`
    
    * resolve names
    
    * change profile to single one thread run.
    
    * 1. change to the ArrayList, because known the max size
    2. rename issue resolved
    
    * add profiling status enum
    
    * change sniffer use full name issue
    
    * 1. remove `prepareProfiling` method, build profiling status when 
construct `TracingContext`
    2. add `TracingThreadListenerManager`, notify when tracing main thread 
finish
    3. change ProfileThread start when process new profile task
    
    * remove unnecessary getter
    
    * add test assert error message
    
    * adding `AgentServiceRule`
    
    * revert original assert
    
    * remove unnecessary getter
    
    * resolve issues
    
    * reduce findService invoke
    
    * resolve style error
    
    * recheck profiling when change first span operatin name
    
    * resolve issues
    1. remove `ContextManager#profilingRecheck`, only check on `TracingContext`
    2. rename comments
    3. resolve volatile array setting
    
    * remove article link
    
    * add `ProfileTask#maxSamplingCount` check
    
    * resolve conflict (Downstream -> Commands)
    
    * 1. change profilingSegmentSlots init on construct
    2. if is profiling, recheck dont need to stop
    3. total profiling count increment on first dump
    
    * remove unused return val
    
    * remove some `@param` and `@return`
    
    * add profile task check result data bean
    
    * change profiler slot to `AtomicReferenceArray`
    
    * resolved java doc error
    
    * fix doc error, remove meaningless descriptions
    
    * resolve missed profile receiver on oap starter
    
    * resolve method invoke error
    
    Co-authored-by: 吴晟 Wu Sheng <wu.sh...@foxmail.com>
    Co-authored-by: kezhenxu94 <kezhenx...@163.com>
---
 .../component/command/ProfileTaskCommand.java      |  18 +-
 .../executor/ProfileTaskCommandExecutor.java       |   3 +-
 .../skywalking/apm/agent/core/conf/Config.java     |  20 ++
 .../agent/core/context/AbstractTracerContext.java  |   1 +
 .../core/context/ContextManagerExtendService.java  |   2 +-
 .../apm/agent/core/context/TracingContext.java     | 100 ++++++++-
 .../agent/core/context/TracingThreadListener.java  |  24 +-
 .../core/context/trace/AbstractTracingSpan.java    |  19 +-
 .../apm/agent/core/context/trace/EntrySpan.java    |   9 +-
 .../apm/agent/core/context/trace/ExitSpan.java     |  17 +-
 .../apm/agent/core/context/trace/LocalSpan.java    |  10 +-
 .../core/context/trace/StackBasedTracingSpan.java  |  25 ++-
 .../apm/agent/core/profile/ProfileTask.java        |  28 ++-
 .../core/profile/ProfileTaskChannelService.java    | 243 +++++++++++++++++++++
 .../core/profile/ProfileTaskExecutionContext.java  | 119 +++++++++-
 .../core/profile/ProfileTaskExecutionService.java  | 124 ++++++++---
 .../core/profile/ProfileTaskQueryService.java      | 129 -----------
 .../apm/agent/core/profile/ProfileThread.java      | 115 ++++++++++
 .../apm/agent/core/profile/ProfilingStatus.java    |  24 +-
 .../apm/agent/core/profile/ThreadProfiler.java     | 153 +++++++++++++
 .../agent/core/profile/TracingThreadSnapshot.java  |  73 +++++++
 ...ache.skywalking.apm.agent.core.boot.BootService |   2 +-
 .../apm/agent/core/boot/ServiceManagerTest.java    |  18 +-
 .../apm/agent/core/context/TracingContextTest.java |   2 +-
 .../agent/core/test/tools/AgentServiceRule.java    |   3 +
 apm-sniffer/config/agent.config                    |  12 +
 docs/en/setup/service-agent/java-agent/README.md   |   4 +
 .../src/main/resources/application.yml             |   3 +
 .../oap/server/core/cache/ProfileTaskCache.java    |  45 +++-
 .../oap/server/core/command/CommandService.java    |   2 +-
 .../profile/ProfileTaskSegmentSnapshotRecord.java  | 101 +++++++++
 .../oap/server/core/source/DefaultScopeDefine.java |   1 +
 .../core/storage/profile/IProfileTaskQueryDAO.java |   7 +
 .../handler/ProfileTaskServiceHandler.java         |  77 ++++++-
 .../elasticsearch/query/ProfileTaskQueryEsDAO.java |  19 ++
 .../plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java  |  24 ++
 .../skywalking/e2e/ProfileVerificationITCase.java  |  40 +++-
 .../skywalking/e2e/ProfileVerificationITCase.java  |  40 +++-
 .../skywalking/e2e/ProfileVerificationITCase.java  |  40 +++-
 .../{TestController.java => CreateUser.java}       |  36 +--
 .../skywalking/e2e/profile/TestController.java     |  14 +-
 ...leVerificationITCase.profileTasks.finished.yml} |   5 +
 ...leVerificationITCase.profileTasks.notified.yml} |   0
 43 files changed, 1419 insertions(+), 332 deletions(-)

diff --git 
a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
 
b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
index fecb876..f595fda 100644
--- 
a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
+++ 
b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
@@ -27,10 +27,11 @@ import java.util.List;
  * @author MrPro
  */
 public class ProfileTaskCommand extends BaseCommand implements Serializable, 
Deserializable<ProfileTaskCommand> {
-    public static final Deserializable<ProfileTaskCommand> DESERIALIZER = new 
ProfileTaskCommand("", "", 0, 0, 0, 0, 0, 0);
+    public static final Deserializable<ProfileTaskCommand> DESERIALIZER = new 
ProfileTaskCommand("", "", "", 0, 0, 0, 0, 0, 0);
     public static final String NAME = "ProfileTaskQuery";
 
     // profile task data
+    private String taskId;
     private String endpointName;
     private int duration;
     private int minDurationThreshold;
@@ -39,8 +40,9 @@ public class ProfileTaskCommand extends BaseCommand 
implements Serializable, Des
     private long startTime;
     private long createTime;
 
-    public ProfileTaskCommand(String serialNumber, String endpointName, int 
duration, int minDurationThreshold, int dumpPeriod, int maxSamplingCount, long 
startTime, long createTime) {
+    public ProfileTaskCommand(String serialNumber, String taskId, String 
endpointName, int duration, int minDurationThreshold, int dumpPeriod, int 
maxSamplingCount, long startTime, long createTime) {
         super(NAME, serialNumber);
+        this.taskId = taskId;
         this.endpointName = endpointName;
         this.duration = duration;
         this.minDurationThreshold = minDurationThreshold;
@@ -54,6 +56,7 @@ public class ProfileTaskCommand extends BaseCommand 
implements Serializable, Des
     public ProfileTaskCommand deserialize(Command command) {
         final List<KeyStringValuePair> argsList = command.getArgsList();
         String serialNumber = null;
+        String taskId = null;
         String endpointName = null;
         int duration = 0;
         int minDurationThreshold = 0;
@@ -67,6 +70,8 @@ public class ProfileTaskCommand extends BaseCommand 
implements Serializable, Des
                 serialNumber = pair.getValue();
             } else if ("EndpointName".equals(pair.getKey())) {
                 endpointName = pair.getValue();
+            } else if ("TaskId".equals(pair.getKey())) {
+                taskId = pair.getValue();
             } else if ("Duration".equals(pair.getKey())) {
                 duration = Integer.parseInt(pair.getValue());
             } else if ("MinDurationThreshold".equals(pair.getKey())) {
@@ -82,13 +87,14 @@ public class ProfileTaskCommand extends BaseCommand 
implements Serializable, Des
             }
         }
 
-        return new ProfileTaskCommand(serialNumber, endpointName, duration, 
minDurationThreshold, dumpPeriod, maxSamplingCount, startTime, createTime);
+        return new ProfileTaskCommand(serialNumber, taskId, endpointName, 
duration, minDurationThreshold, dumpPeriod, maxSamplingCount, startTime, 
createTime);
     }
 
     @Override
     public Command.Builder serialize() {
         final Command.Builder builder = commandBuilder();
-        
builder.addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName))
+        
builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId))
+                
.addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName))
                 
.addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration)))
                 
.addArgs(KeyStringValuePair.newBuilder().setKey("MinDurationThreshold").setValue(String.valueOf(minDurationThreshold)))
                 
.addArgs(KeyStringValuePair.newBuilder().setKey("DumpPeriod").setValue(String.valueOf(dumpPeriod)))
@@ -125,4 +131,8 @@ public class ProfileTaskCommand extends BaseCommand 
implements Serializable, Des
     public long getCreateTime() {
         return createTime;
     }
+
+    public String getTaskId() {
+        return taskId;
+    }
 }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
index c6c22d2..eb6212c 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
@@ -39,7 +39,8 @@ public class ProfileTaskCommandExecutor implements 
CommandExecutor {
 
         // build profile task
         final ProfileTask profileTask = new ProfileTask();
-        profileTask.setEndpointName(profileTaskCommand.getEndpointName());
+        profileTask.setTaskId(profileTaskCommand.getTaskId());
+        profileTask.setFistSpanOPName(profileTaskCommand.getEndpointName());
         profileTask.setDuration(profileTaskCommand.getDuration());
         
profileTask.setMinDurationThreshold(profileTaskCommand.getMinDurationThreshold());
         profileTask.setThreadDumpPeriod(profileTaskCommand.getDumpPeriod());
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 084e819..d356e2b 100755
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -138,6 +138,26 @@ public class Config {
          * If true, skywalking agent will enable profile when user create a 
new profile task. Otherwise disable profile.
          */
         public static boolean ACTIVE = true;
+
+        /**
+         * Parallel monitor segment count
+         */
+        public static int MAX_PARALLEL = 5;
+
+        /**
+         * Max monitor segment time(minutes), if current segment monitor time 
out of limit, then stop it.
+         */
+        public static int MAX_DURATION = 10;
+
+        /**
+         * Max dump thread stack depth
+         */
+        public static int DUMP_MAX_STACK_DEPTH = 500;
+
+        /**
+         * Snapshot transport to backend buffer size
+         */
+        public static int SNAPSHOT_TRANSPORT_BUFFER_SIZE = 500;
     }
 
     public static class Jvm {
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
index 4b3cd2d..6c76873 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
@@ -115,4 +115,5 @@ public interface AbstractTracerContext {
      * @param span to be stopped.
      */
     void asyncStop(AsyncSpan span);
+
 }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java
index d25e915..1331f62 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java
@@ -51,7 +51,7 @@ public class ContextManagerExtendService implements 
BootService {
         } else {
             SamplingService samplingService = 
ServiceManager.INSTANCE.findService(SamplingService.class);
             if (forceSampling || samplingService.trySampling()) {
-                context = new TracingContext();
+                context = new TracingContext(operationName);
             } else {
                 context = new IgnoredTracerContext();
             }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
index ccb9aae..9a9ccd5 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
@@ -40,6 +40,7 @@ import 
org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
 import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound;
 import org.apache.skywalking.apm.agent.core.logging.api.ILog;
 import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import 
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
 import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
 import org.apache.skywalking.apm.util.StringUtil;
 
@@ -62,9 +63,14 @@ public class TracingContext implements AbstractTracerContext 
{
     private long lastWarningTimestamp = 0;
 
     /**
+     * @see {@link ProfileTaskExecutionService}
+     */
+    private static ProfileTaskExecutionService PROFILE_TASK_EXECUTION_SERVICE;
+
+    /**
      * @see {@link SamplingService}
      */
-    private SamplingService samplingService;
+    private static SamplingService SAMPLING_SERVICE;
 
     /**
      * The final {@link TraceSegment}, which includes all finished spans.
@@ -92,15 +98,32 @@ public class TracingContext implements 
AbstractTracerContext {
 
     private volatile boolean running;
 
+    private final long createTime;
+
+    /**
+     * profiling status
+     */
+    private volatile boolean profiling;
+
     /**
      * Initialize all fields with default value.
      */
-    TracingContext() {
+    TracingContext(String firstOPName) {
         this.segment = new TraceSegment();
         this.spanIdGenerator = 0;
-        samplingService = 
ServiceManager.INSTANCE.findService(SamplingService.class);
         isRunningInAsyncMode = false;
+        createTime = System.currentTimeMillis();
         running = true;
+
+        if (SAMPLING_SERVICE == null) {
+            SAMPLING_SERVICE = 
ServiceManager.INSTANCE.findService(SamplingService.class);
+        }
+
+        // profiling status
+        if (PROFILE_TASK_EXECUTION_SERVICE == null) {
+            PROFILE_TASK_EXECUTION_SERVICE = 
ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
+        }
+        this.profiling = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(this, 
segment.getTraceSegmentId(), firstOPName);
     }
 
     /**
@@ -308,6 +331,7 @@ public class TracingContext implements 
AbstractTracerContext {
             return push(span);
         }
         AbstractSpan entrySpan;
+        TracingContext owner = this;
         final AbstractSpan parentSpan = peek();
         final int parentSpanId = parentSpan == null ? -1 : 
parentSpan.getSpanId();
         if (parentSpan != null && parentSpan.isEntry()) {
@@ -328,11 +352,11 @@ public class TracingContext implements 
AbstractTracerContext {
                 .findOnly(segment.getServiceId(), operationName)
                 .doInCondition(new PossibleFound.FoundAndObtain() {
                     @Override public Object doProcess(int operationId) {
-                        return new EntrySpan(spanIdGenerator++, parentSpanId, 
operationId);
+                        return new EntrySpan(spanIdGenerator++, parentSpanId, 
operationId, owner);
                     }
                 }, new PossibleFound.NotFoundAndObtain() {
                     @Override public Object doProcess() {
-                        return new EntrySpan(spanIdGenerator++, parentSpanId, 
operationName);
+                        return new EntrySpan(spanIdGenerator++, parentSpanId, 
operationName, owner);
                     }
                 });
             entrySpan.start();
@@ -358,7 +382,7 @@ public class TracingContext implements 
AbstractTracerContext {
          * From v6.0.0-beta, local span doesn't do op name register.
          * All op name register is related to entry and exit spans only.
          */
-        AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, 
parentSpanId, operationName);
+        AbstractTracingSpan span = new LocalSpan(spanIdGenerator++, 
parentSpanId, operationName, this);
         span.start();
         return push(span);
     }
@@ -380,6 +404,7 @@ public class TracingContext implements 
AbstractTracerContext {
 
         AbstractSpan exitSpan;
         AbstractSpan parentSpan = peek();
+        TracingContext owner = this;
         if (parentSpan != null && parentSpan.isExit()) {
             exitSpan = parentSpan;
         } else {
@@ -389,13 +414,13 @@ public class TracingContext implements 
AbstractTracerContext {
                     new PossibleFound.FoundAndObtain() {
                         @Override
                         public Object doProcess(final int peerId) {
-                            return new ExitSpan(spanIdGenerator++, 
parentSpanId, operationName, peerId);
+                            return new ExitSpan(spanIdGenerator++, 
parentSpanId, operationName, peerId, owner);
                         }
                     },
                     new PossibleFound.NotFoundAndObtain() {
                         @Override
                         public Object doProcess() {
-                            return new ExitSpan(spanIdGenerator++, 
parentSpanId, operationName, remotePeer);
+                            return new ExitSpan(spanIdGenerator++, 
parentSpanId, operationName, remotePeer, owner);
                         }
                     });
             push(exitSpan);
@@ -463,15 +488,38 @@ public class TracingContext implements 
AbstractTracerContext {
     }
 
     /**
+     * Re-check current trace need profiling, encase third part plugin change 
the operation name.
+     *
+     * @param span current modify span
+     * @param operationName change to operation name
+     */
+    public void profilingRecheck(AbstractSpan span, String operationName) {
+        // only recheck first span
+        if (span.getSpanId() != 0) {
+            return;
+        }
+
+        profiling = PROFILE_TASK_EXECUTION_SERVICE.profilingRecheck(this, 
segment.getTraceSegmentId(), operationName);
+    }
+
+    /**
      * Finish this context, and notify all {@link TracingContextListener}s, 
managed by {@link
-     * TracingContext.ListenerManager}
+     * TracingContext.ListenerManager} and {@link 
TracingContext.TracingThreadListenerManager}
      */
     private void finish() {
         if (isRunningInAsyncMode) {
             asyncFinishLock.lock();
         }
         try {
-            if (activeSpanStack.isEmpty() && running && (!isRunningInAsyncMode 
|| asyncSpanCounter.get() == 0)) {
+            boolean isFinishedInMainThread = activeSpanStack.isEmpty() && 
running;
+            if (isFinishedInMainThread) {
+                /**
+                 * Notify after tracing finished in the main thread.
+                 */
+                TracingThreadListenerManager.notifyFinish(this);
+            }
+
+            if (isFinishedInMainThread && (!isRunningInAsyncMode || 
asyncSpanCounter.get() == 0)) {
                 TraceSegment finishedSegment = 
segment.finish(isLimitMechanismWorking());
                 /*
                  * Recheck the segment if the segment contains only one span.
@@ -480,7 +528,7 @@ public class TracingContext implements 
AbstractTracerContext {
                  * @see {@link #createSpan(String, long, boolean)}
                  */
                 if (!segment.hasRef() && segment.isSingleSpanSegment()) {
-                    if (!samplingService.trySampling()) {
+                    if (!SAMPLING_SERVICE.trySampling()) {
                         finishedSegment.setIgnore(true);
                     }
                 }
@@ -544,6 +592,27 @@ public class TracingContext implements 
AbstractTracerContext {
     }
 
     /**
+     * The <code>ListenerManager</code> represents an event notify for every 
registered listener, which are notified
+     */
+    public static class TracingThreadListenerManager {
+        private static List<TracingThreadListener> LISTENERS = new 
LinkedList<>();
+
+        public static synchronized void add(TracingThreadListener listener) {
+            LISTENERS.add(listener);
+        }
+
+        static void notifyFinish(TracingContext finishedContext) {
+            for (TracingThreadListener listener : LISTENERS) {
+                listener.afterMainThreadFinish(finishedContext);
+            }
+        }
+
+        public static synchronized void remove(TracingThreadListener listener) 
{
+            LISTENERS.remove(listener);
+        }
+    }
+
+    /**
      * @return the top element of 'ActiveSpanStack', and remove it.
      */
     private AbstractSpan pop() {
@@ -587,4 +656,13 @@ public class TracingContext implements 
AbstractTracerContext {
             return false;
         }
     }
+
+    public long createTime() {
+        return this.createTime;
+    }
+
+    public boolean isProfiling() {
+        return this.profiling;
+    }
+
 }
diff --git 
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java
similarity index 59%
copy from 
test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
copy to 
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java
index c300fc3..c175454 100644
--- 
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java
@@ -16,30 +16,12 @@
  *
  */
 
-package org.apache.skywalking.e2e.profile;
-
-import org.springframework.web.bind.annotation.*;
+package org.apache.skywalking.apm.agent.core.context;
 
 /**
  * @author MrPro
  */
-@RestController
-@RequestMapping("/e2e")
-public class TestController {
-    private final UserRepo userRepo;
-
-    public TestController(final UserRepo userRepo) {
-        this.userRepo = userRepo;
-    }
-
-    @GetMapping("/health-check")
-    public String hello() {
-        return "healthy";
-    }
+public interface TracingThreadListener {
 
-    @PostMapping("/users")
-    public User createAuthor(@RequestBody final User user) throws 
InterruptedException {
-        Thread.sleep(1000L);
-        return userRepo.save(user);
-    }
+    void afterMainThreadFinish(TracingContext tracingContext);
 }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
index b5ece98..4eec067 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
@@ -48,7 +48,11 @@ public abstract class AbstractTracingSpan implements 
AbstractSpan {
      * The flag represents whether the span has been async stopped
      */
     private volatile boolean isAsyncStopped = false;
-    protected volatile AbstractTracerContext context;
+
+    /**
+     * The context to which the span belongs
+     */
+    protected final TracingContext owner;
 
     /**
      * The start time of this Span.
@@ -79,18 +83,20 @@ public abstract class AbstractTracingSpan implements 
AbstractSpan {
      */
     protected List<TraceSegmentRef> refs;
 
-    protected AbstractTracingSpan(int spanId, int parentSpanId, String 
operationName) {
+    protected AbstractTracingSpan(int spanId, int parentSpanId, String 
operationName, TracingContext owner) {
         this.operationName = operationName;
         this.operationId = DictionaryUtil.nullValue();
         this.spanId = spanId;
         this.parentSpanId = parentSpanId;
+        this.owner = owner;
     }
 
-    protected AbstractTracingSpan(int spanId, int parentSpanId, int 
operationId) {
+    protected AbstractTracingSpan(int spanId, int parentSpanId, int 
operationId, TracingContext owner) {
         this.operationName = null;
         this.operationId = operationId;
         this.spanId = spanId;
         this.parentSpanId = parentSpanId;
+        this.owner = owner;
     }
 
     /**
@@ -203,6 +209,9 @@ public abstract class AbstractTracingSpan implements 
AbstractSpan {
     public AbstractTracingSpan setOperationName(String operationName) {
         this.operationName = operationName;
         this.operationId = DictionaryUtil.nullValue();
+
+        // recheck profiling status
+        owner.profilingRecheck(this, operationName);
         return this;
     }
 
@@ -332,7 +341,7 @@ public abstract class AbstractTracingSpan implements 
AbstractSpan {
         if (isInAsyncMode) {
             throw new RuntimeException("Prepare for async repeatedly. Span is 
already in async mode.");
         }
-        context = ContextManager.awaitFinishAsync(this);
+        ContextManager.awaitFinishAsync(this);
         isInAsyncMode = true;
         return this;
     }
@@ -345,7 +354,7 @@ public abstract class AbstractTracingSpan implements 
AbstractSpan {
             throw new RuntimeException("Can not do async finish for the span 
repeately.");
         }
         this.endTime = System.currentTimeMillis();
-        context.asyncStop(this);
+        owner.asyncStop(this);
         isAsyncStopped = true;
         return this;
     }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java
index bef43e4..7d83af9 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.agent.core.context.trace;
 
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
 import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
 import org.apache.skywalking.apm.network.trace.component.Component;
 
@@ -37,13 +38,13 @@ public class EntrySpan extends StackBasedTracingSpan {
 
     private int currentMaxDepth;
 
-    public EntrySpan(int spanId, int parentSpanId, String operationName) {
-        super(spanId, parentSpanId, operationName);
+    public EntrySpan(int spanId, int parentSpanId, String operationName, 
TracingContext owner) {
+        super(spanId, parentSpanId, operationName, owner);
         this.currentMaxDepth = 0;
     }
 
-    public EntrySpan(int spanId, int parentSpanId, int operationId) {
-        super(spanId, parentSpanId, operationId);
+    public EntrySpan(int spanId, int parentSpanId, int operationId, 
TracingContext owner) {
+        super(spanId, parentSpanId, operationId, owner);
         this.currentMaxDepth = 0;
     }
 
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java
index aa0089a..d081a78 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java
@@ -19,6 +19,7 @@
 
 package org.apache.skywalking.apm.agent.core.context.trace;
 
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
 import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
 import org.apache.skywalking.apm.network.trace.component.Component;
 
@@ -37,20 +38,20 @@ import 
org.apache.skywalking.apm.network.trace.component.Component;
  */
 public class ExitSpan extends StackBasedTracingSpan implements WithPeerInfo {
 
-    public ExitSpan(int spanId, int parentSpanId, String operationName, String 
peer) {
-        super(spanId, parentSpanId, operationName, peer);
+    public ExitSpan(int spanId, int parentSpanId, String operationName, String 
peer, TracingContext owner) {
+        super(spanId, parentSpanId, operationName, peer, owner);
     }
 
-    public ExitSpan(int spanId, int parentSpanId, int operationId, int peerId) 
{
-        super(spanId, parentSpanId, operationId, peerId);
+    public ExitSpan(int spanId, int parentSpanId, int operationId, int peerId, 
TracingContext owner) {
+        super(spanId, parentSpanId, operationId, peerId, owner);
     }
 
-    public ExitSpan(int spanId, int parentSpanId, int operationId, String 
peer) {
-        super(spanId, parentSpanId, operationId, peer);
+    public ExitSpan(int spanId, int parentSpanId, int operationId, String 
peer, TracingContext owner) {
+        super(spanId, parentSpanId, operationId, peer, owner);
     }
 
-    public ExitSpan(int spanId, int parentSpanId, String operationName, int 
peerId) {
-        super(spanId, parentSpanId, operationName, peerId);
+    public ExitSpan(int spanId, int parentSpanId, String operationName, int 
peerId, TracingContext owner) {
+        super(spanId, parentSpanId, operationName, peerId, owner);
     }
 
     /**
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java
index 1bd4a77..be4c4cc 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java
@@ -19,6 +19,8 @@
 
 package org.apache.skywalking.apm.agent.core.context.trace;
 
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
+
 /**
  * The <code>LocalSpan</code> represents a normal tracing point, such as a 
local method.
  *
@@ -26,12 +28,12 @@ package org.apache.skywalking.apm.agent.core.context.trace;
  */
 public class LocalSpan extends AbstractTracingSpan {
 
-    public LocalSpan(int spanId, int parentSpanId, int operationId) {
-        super(spanId, parentSpanId, operationId);
+    public LocalSpan(int spanId, int parentSpanId, int operationId, 
TracingContext owner) {
+        super(spanId, parentSpanId, operationId, owner);
     }
 
-    public LocalSpan(int spanId, int parentSpanId, String operationName) {
-        super(spanId, parentSpanId, operationName);
+    public LocalSpan(int spanId, int parentSpanId, String operationName, 
TracingContext owner) {
+        super(spanId, parentSpanId, operationName, owner);
     }
 
     @Override
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java
index cdf26a8..19000fb 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.apm.agent.core.context.trace;
 
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
 import org.apache.skywalking.apm.agent.core.dictionary.DictionaryManager;
 import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
 import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound;
@@ -35,40 +36,40 @@ public abstract class StackBasedTracingSpan extends 
AbstractTracingSpan {
     protected String peer;
     protected int peerId;
 
-    protected StackBasedTracingSpan(int spanId, int parentSpanId, String 
operationName) {
-        super(spanId, parentSpanId, operationName);
+    protected StackBasedTracingSpan(int spanId, int parentSpanId, String 
operationName, TracingContext owner) {
+        super(spanId, parentSpanId, operationName, owner);
         this.stackDepth = 0;
         this.peer = null;
         this.peerId = DictionaryUtil.nullValue();
     }
 
-    protected StackBasedTracingSpan(int spanId, int parentSpanId, int 
operationId) {
-        super(spanId, parentSpanId, operationId);
+    protected StackBasedTracingSpan(int spanId, int parentSpanId, int 
operationId, TracingContext owner) {
+        super(spanId, parentSpanId, operationId, owner);
         this.stackDepth = 0;
         this.peer = null;
         this.peerId = DictionaryUtil.nullValue();
     }
 
-    public StackBasedTracingSpan(int spanId, int parentSpanId, int 
operationId, int peerId) {
-        super(spanId, parentSpanId, operationId);
+    public StackBasedTracingSpan(int spanId, int parentSpanId, int 
operationId, int peerId, TracingContext owner) {
+        super(spanId, parentSpanId, operationId, owner);
         this.peer = null;
         this.peerId = peerId;
     }
 
-    public StackBasedTracingSpan(int spanId, int parentSpanId, int 
operationId, String peer) {
-        super(spanId, parentSpanId, operationId);
+    public StackBasedTracingSpan(int spanId, int parentSpanId, int 
operationId, String peer, TracingContext owner) {
+        super(spanId, parentSpanId, operationId, owner);
         this.peer = peer;
         this.peerId = DictionaryUtil.nullValue();
     }
 
-    protected StackBasedTracingSpan(int spanId, int parentSpanId, String 
operationName, String peer) {
-        super(spanId, parentSpanId, operationName);
+    protected StackBasedTracingSpan(int spanId, int parentSpanId, String 
operationName, String peer, TracingContext owner) {
+        super(spanId, parentSpanId, operationName, owner);
         this.peer = peer;
         this.peerId = DictionaryUtil.nullValue();
     }
 
-    protected StackBasedTracingSpan(int spanId, int parentSpanId, String 
operationName, int peerId) {
-        super(spanId, parentSpanId, operationName);
+    protected StackBasedTracingSpan(int spanId, int parentSpanId, String 
operationName, int peerId, TracingContext owner) {
+        super(spanId, parentSpanId, operationName, owner);
         this.peer = null;
         this.peerId = peerId;
     }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
index 8669c7d..2c4c6c1 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
@@ -27,8 +27,11 @@ import java.util.Objects;
  */
 public class ProfileTask {
 
-    // monitor endpoint name
-    private String endpointName;
+    // task id
+    private String taskId;
+
+    // monitor first span operation name
+    private String fistSpanOPName;
 
     // task duration (minute)
     private int duration;
@@ -48,12 +51,12 @@ public class ProfileTask {
     // task create time
     private long createTime;
 
-    public String getEndpointName() {
-        return endpointName;
+    public String getFistSpanOPName() {
+        return fistSpanOPName;
     }
 
-    public void setEndpointName(String endpointName) {
-        this.endpointName = endpointName;
+    public void setFistSpanOPName(String fistSpanOPName) {
+        this.fistSpanOPName = fistSpanOPName;
     }
 
     public int getDuration() {
@@ -104,6 +107,14 @@ public class ProfileTask {
         this.createTime = createTime;
     }
 
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -115,11 +126,12 @@ public class ProfileTask {
                 maxSamplingCount == that.maxSamplingCount &&
                 startTime == that.startTime &&
                 createTime == that.createTime &&
-                endpointName.equals(that.endpointName);
+                taskId.equals(that.taskId) &&
+                fistSpanOPName.equals(that.fistSpanOPName);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(endpointName, duration, minDurationThreshold, 
threadDumpPeriod, maxSamplingCount, startTime, createTime);
+        return Objects.hash(taskId, fistSpanOPName, duration, 
minDurationThreshold, threadDumpPeriod, maxSamplingCount, startTime, 
createTime);
     }
 }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java
new file mode 100644
index 0000000..9a4b839
--- /dev/null
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import io.grpc.Channel;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.agent.core.boot.BootService;
+import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
+import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.commands.CommandService;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
+import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.remote.*;
+import org.apache.skywalking.apm.network.common.Commands;
+import 
org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
+import 
org.apache.skywalking.apm.network.language.profile.ProfileTaskFinishReport;
+import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
+import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
+/**
+ * Sniffer and backend, about the communication service of profile task 
protocol.
+ * 1. Sniffer will check has new profile task list every {@link 
Config.Collector#GET_PROFILE_TASK_INTERVAL} second.
+ * 2. When there is a new profile task snapshot, the data is transferred to 
the back end. use {@link LinkedBlockingQueue}
+ * 3. When profiling task finish, it will send task finish status to backend
+ *
+ * @author MrPro
+ */
+@DefaultImplementor
+public class ProfileTaskChannelService implements BootService, Runnable, 
GRPCChannelListener {
+    private static final ILog logger = 
LogManager.getLogger(ProfileTaskChannelService.class);
+
+    // channel status
+    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
+
+    // gRPC stub
+    private volatile ProfileTaskGrpc.ProfileTaskBlockingStub 
profileTaskBlockingStub;
+    private volatile ProfileTaskGrpc.ProfileTaskStub profileTaskStub;
+
+    // segment snapshot sender
+    private final LinkedBlockingQueue<TracingThreadSnapshot> snapshotQueue = 
new LinkedBlockingQueue<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
+    private volatile ScheduledFuture<?> sendSnapshotFuture;
+
+    // query task list schedule
+    private volatile ScheduledFuture<?> getTaskListFuture;
+
+    @Override
+    public void run() {
+        if (RemoteDownstreamConfig.Agent.SERVICE_ID != 
DictionaryUtil.nullValue()
+                && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != 
DictionaryUtil.nullValue()
+        ) {
+            if (status == GRPCChannelStatus.CONNECTED) {
+                try {
+                    ProfileTaskCommandQuery.Builder builder = 
ProfileTaskCommandQuery.newBuilder();
+
+                    // sniffer info
+                    
builder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
+
+                    // last command create time
+                    
builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime());
+
+                    Commands commands = 
profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS).getProfileTaskCommands(builder.build());
+                    
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
+                } catch (Throwable t) {
+                    if (!(t instanceof StatusRuntimeException)) {
+                        logger.error(t, "Query profile task from backend 
fail.");
+                        return;
+                    }
+                    final StatusRuntimeException statusRuntimeException = 
(StatusRuntimeException) t;
+                    if (statusRuntimeException.getStatus().getCode() == 
Status.Code.UNIMPLEMENTED) {
+                        logger.warn("Backend doesn't support profiling, 
profiling will be disabled");
+                        if (getTaskListFuture != null) {
+                            getTaskListFuture.cancel(true);
+                        }
+
+                        // stop snapshot sender
+                        if (sendSnapshotFuture != null) {
+                            sendSnapshotFuture.cancel(true);
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void prepare() throws Throwable {
+        
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
+    }
+
+    @Override
+    public void boot() throws Throwable {
+        if (Config.Profile.ACTIVE) {
+            // query task list
+            getTaskListFuture = Executors.newSingleThreadScheduledExecutor(new 
DefaultNamedThreadFactory("ProfileGetTaskService"))
+                    .scheduleWithFixedDelay(new 
RunnableWithExceptionProtection(this, new 
RunnableWithExceptionProtection.CallbackWhenException() {
+                        @Override
+                        public void handle(Throwable t) {
+                            logger.error("Query profile task list failure.", 
t);
+                        }
+                    }), 0, Config.Collector.GET_PROFILE_TASK_INTERVAL, 
TimeUnit.SECONDS);
+
+            sendSnapshotFuture = 
Executors.newSingleThreadScheduledExecutor(new 
DefaultNamedThreadFactory("ProfileSendSnapshotService"))
+                    .scheduleWithFixedDelay(new 
RunnableWithExceptionProtection(new SnapshotSender(), new 
RunnableWithExceptionProtection.CallbackWhenException() {
+                        @Override public void handle(Throwable t) {
+                            logger.error("Profile segment snapshot upload 
failure.", t);
+                        }
+                    }), 0, 500, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    @Override
+    public void onComplete() throws Throwable {
+    }
+
+    @Override
+    public void shutdown() throws Throwable {
+        if (getTaskListFuture != null) {
+            getTaskListFuture.cancel(true);
+        }
+
+        if (sendSnapshotFuture != null) {
+            sendSnapshotFuture.cancel(true);
+        }
+    }
+
+    @Override
+    public void statusChanged(GRPCChannelStatus status) {
+        if (GRPCChannelStatus.CONNECTED.equals(status)) {
+            Channel channel = 
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
+            profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel);
+            profileTaskStub = ProfileTaskGrpc.newStub(channel);
+        } else {
+            profileTaskBlockingStub = null;
+            profileTaskStub = null;
+        }
+        this.status = status;
+    }
+
+    /**
+     * add a new profiling snapshot, send to {@link #snapshotQueue}
+     */
+    public void addProfilingSnapshot(TracingThreadSnapshot snapshot) {
+        snapshotQueue.add(snapshot);
+    }
+
+    /**
+     * notify backend profile task has finish
+     */
+    public void notifyProfileTaskFinish(ProfileTask task) {
+        try {
+            final ProfileTaskFinishReport.Builder reportBuilder = 
ProfileTaskFinishReport.newBuilder();
+            // sniffer info
+            
reportBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
+            // task info
+            reportBuilder.setTaskId(task.getTaskId());
+
+            // send data
+            profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS).reportTaskFinish(reportBuilder.build());
+        } catch (Throwable e) {
+            logger.error(e, "Notify profile task finish to backend fail.");
+        }
+    }
+
+    /**
+     * send segment snapshot
+     */
+    private class SnapshotSender implements Runnable {
+
+        @Override
+        public void run() {
+            if (status == GRPCChannelStatus.CONNECTED) {
+                try {
+                    ArrayList<TracingThreadSnapshot> buffer = new 
ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
+                    snapshotQueue.drainTo(buffer);
+                    if (buffer.size() > 0) {
+                        final GRPCStreamServiceStatus status = new 
GRPCStreamServiceStatus(false);
+                        StreamObserver<ThreadSnapshot> snapshotStreamObserver 
= profileTaskStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS).collectSnapshot(new StreamObserver<Commands>() {
+                            @Override
+                            public void onNext(Commands commands) {
+                            }
+
+                            @Override
+                            public void onError(Throwable throwable) {
+                                status.finished();
+                                if (logger.isErrorEnable()) {
+                                    logger.error(throwable, "Send profile 
segment snapshot to collector fail with a grpc internal exception.");
+                                }
+                                
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
+                            }
+
+                            @Override
+                            public void onCompleted() {
+                                status.finished();
+                            }
+                        });
+                        for (TracingThreadSnapshot snapshot : buffer) {
+                            final ThreadSnapshot transformSnapshot = 
snapshot.transform();
+                            snapshotStreamObserver.onNext(transformSnapshot);
+                        }
+
+                        snapshotStreamObserver.onCompleted();
+                        status.wait4Finish();
+                    }
+                } catch (Throwable t) {
+                    logger.error(t, "Send profile segment snapshot to backend 
fail.");
+                }
+            }
+        }
+
+    }
+}
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
index 20b9eec..b17414c 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
@@ -18,7 +18,15 @@
 
 package org.apache.skywalking.apm.agent.core.profile;
 
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
+import org.apache.skywalking.apm.agent.core.context.ids.ID;
+
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
 
 /**
  * profile task execution context, it will create on process this profile task
@@ -30,20 +38,119 @@ public class ProfileTaskExecutionContext {
     // task data
     private final ProfileTask task;
 
-    // task real start time
-    private final long startTime;
+    // record current profiling count, use this to check has available profile 
slot
+    private final AtomicInteger currentProfilingCount = new AtomicInteger(0);
+
+    // profiling segment slot
+    private volatile AtomicReferenceArray<ThreadProfiler> 
profilingSegmentSlots;
+
+    // current profiling execution future
+    private volatile Future profilingFuture;
 
-    public ProfileTaskExecutionContext(ProfileTask task, long startTime) {
+    // total started profiling tracing context count
+    private final AtomicInteger totalStartedProfilingCount = new 
AtomicInteger(0);
+
+    public ProfileTaskExecutionContext(ProfileTask task) {
         this.task = task;
-        this.startTime = startTime;
+        profilingSegmentSlots = new 
AtomicReferenceArray<>(Config.Profile.MAX_PARALLEL);
+    }
+
+    /**
+     * start profiling this task
+     */
+    public void startProfiling(ExecutorService executorService) {
+        profilingFuture = executorService.submit(new ProfileThread(this));
+    }
+
+    /**
+     * stop profiling
+     */
+    public void stopProfiling() {
+        if (profilingFuture != null) {
+            profilingFuture.cancel(true);
+        }
+    }
+
+    /**
+     * check have available slot to profile and add it
+     *
+     * @return is add profile success
+     */
+    public boolean attemptProfiling(TracingContext tracingContext, ID 
traceSegmentId, String firstSpanOPName) {
+        // check has available slot
+        final int usingSlotCount = currentProfilingCount.get();
+        if (usingSlotCount >= Config.Profile.MAX_PARALLEL) {
+            return false;
+        }
+
+        // check first operation name matches
+        if (!Objects.equals(task.getFistSpanOPName(), firstSpanOPName)) {
+            return false;
+        }
+
+        // if out limit started profiling count then stop add profiling
+        if (totalStartedProfilingCount.get() > task.getMaxSamplingCount()) {
+            return false;
+        }
+
+        // try to occupy slot
+        if (!currentProfilingCount.compareAndSet(usingSlotCount, 
usingSlotCount + 1)) {
+            return false;
+        }
+
+        final ThreadProfiler threadProfiler = new 
ThreadProfiler(tracingContext, traceSegmentId, Thread.currentThread(), this);
+        int slotLength = profilingSegmentSlots.length();
+        for (int slot = 0; slot < slotLength; slot++) {
+            if (profilingSegmentSlots.compareAndSet(slot, null, 
threadProfiler)) {
+                break;
+            }
+        }
+        return true;
+    }
+
+
+    /**
+     * profiling recheck
+     */
+    public boolean profilingRecheck(TracingContext tracingContext, ID 
traceSegmentId, String firstSpanOPName) {
+        // if started, keep profiling
+        if (tracingContext.isProfiling()) {
+            return true;
+        }
+
+        return attemptProfiling(tracingContext, traceSegmentId, 
firstSpanOPName);
+    }
+
+    /**
+     * find tracing context and clear on slot
+     */
+    public void stopTracingProfile(TracingContext tracingContext) {
+        // find current tracingContext and clear it
+        int slotLength = profilingSegmentSlots.length();
+        for (int slot = 0; slot < slotLength; slot++) {
+            ThreadProfiler currentProfiler = profilingSegmentSlots.get(slot);
+            if (currentProfiler != null && 
currentProfiler.matches(tracingContext)) {
+                profilingSegmentSlots.set(slot, null);
+
+                // setting stop running
+                currentProfiler.stopProfiling();
+                currentProfilingCount.addAndGet(-1);
+                break;
+            }
+        }
     }
 
     public ProfileTask getTask() {
         return task;
     }
 
-    public long getStartTime() {
-        return startTime;
+    public AtomicReferenceArray<ThreadProfiler> threadProfilerSlots() {
+        return profilingSegmentSlots;
+    }
+
+    public boolean isStartProfileable() {
+        // check is out of max sampling count check
+        return totalStartedProfilingCount.incrementAndGet() > 
task.getMaxSamplingCount();
     }
 
     @Override
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
index 0bf0f50..9a3f270 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
@@ -21,6 +21,10 @@ package org.apache.skywalking.apm.agent.core.profile;
 import org.apache.skywalking.apm.agent.core.boot.BootService;
 import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
 import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
+import org.apache.skywalking.apm.agent.core.context.TracingThreadListener;
+import org.apache.skywalking.apm.agent.core.context.ids.ID;
 import org.apache.skywalking.apm.agent.core.logging.api.ILog;
 import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
 import org.apache.skywalking.apm.network.constants.ProfileConstants;
@@ -38,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * @author MrPro
  */
 @DefaultImplementor
-public class ProfileTaskExecutionService implements BootService {
+public class ProfileTaskExecutionService implements BootService, 
TracingThreadListener {
 
     private static final ILog logger = 
LogManager.getLogger(ProfileTaskExecutionService.class);
 
@@ -51,12 +55,14 @@ public class ProfileTaskExecutionService implements 
BootService {
     // current processing profile task context
     private final AtomicReference<ProfileTaskExecutionContext> 
taskExecutionContext = new AtomicReference<>();
 
+    // profile executor thread pool, only running one thread
+    private final static ExecutorService PROFILE_EXECUTOR = 
Executors.newSingleThreadExecutor(new 
DefaultNamedThreadFactory("PROFILING-TASK"));
+
     // profile task list, include running and waiting running tasks
     private final List<ProfileTask> profileTaskList = 
Collections.synchronizedList(new LinkedList<>());
 
     /**
-     * get profile task from OAP
-     * @param task
+     * add profile task from OAP
      */
     public void addProfileTask(ProfileTask task) {
         // update last command create time
@@ -65,9 +71,9 @@ public class ProfileTaskExecutionService implements 
BootService {
         }
 
         // check profile task limit
-        final String dataError = checkProfileTaskSuccess(task);
-        if (dataError != null) {
-            logger.warn("check command error, cannot process this profile 
task. reason: {}", dataError);
+        final CheckResult dataError = checkProfileTaskSuccess(task);
+        if (!dataError.isSuccess()) {
+            logger.warn("check command error, cannot process this profile 
task. reason: {}", dataError.getErrorReason());
             return;
         }
 
@@ -85,18 +91,45 @@ public class ProfileTaskExecutionService implements 
BootService {
     }
 
     /**
+     * check and add {@link TracingContext} profiling
+     */
+    public boolean addProfiling(TracingContext tracingContext, ID 
traceSegmentId, String firstSpanOPName) {
+        // get current profiling task, check need profiling
+        final ProfileTaskExecutionContext executionContext = 
taskExecutionContext.get();
+        if (executionContext == null) {
+            return false;
+        }
+
+        return executionContext.attemptProfiling(tracingContext, 
traceSegmentId, firstSpanOPName);
+    }
+
+    /**
+     * Re-check current trace need profiling, in case that third-party plugins 
change the operation name.
+     */
+    public boolean profilingRecheck(TracingContext tracingContext, ID 
traceSegmentId, String firstSpanOPName) {
+        // get current profiling task, check need profiling
+        final ProfileTaskExecutionContext executionContext = 
taskExecutionContext.get();
+        if (executionContext == null) {
+            return false;
+        }
+
+        return executionContext.profilingRecheck(tracingContext, 
traceSegmentId, firstSpanOPName);
+    }
+
+    /**
      * active the selected profile task to execution task, and start a removal 
task for it.
-     * @param task
      */
     private synchronized void processProfileTask(ProfileTask task) {
         // make sure prev profile task already stopped
         stopCurrentProfileTask(taskExecutionContext.get());
 
         // make stop task schedule and task context
-        // TODO process task on next step
-        final ProfileTaskExecutionContext currentStartedTaskContext = new 
ProfileTaskExecutionContext(task, System.currentTimeMillis());
+        final ProfileTaskExecutionContext currentStartedTaskContext = new 
ProfileTaskExecutionContext(task);
         taskExecutionContext.set(currentStartedTaskContext);
 
+        // start profiling this task
+        currentStartedTaskContext.startProfiling(PROFILE_EXECUTOR);
+
         PROFILE_TASK_SCHEDULE.schedule(new Runnable() {
             @Override
             public void run() {
@@ -108,36 +141,44 @@ public class ProfileTaskExecutionService implements 
BootService {
     /**
      * stop profile task, remove context data
      */
-    private synchronized void 
stopCurrentProfileTask(ProfileTaskExecutionContext needToStop) {
+    synchronized void stopCurrentProfileTask(ProfileTaskExecutionContext 
needToStop) {
         // stop same context only
         if (needToStop == null || 
!taskExecutionContext.compareAndSet(needToStop, null)) {
             return;
         }
 
+        // current execution stop running
+        needToStop.stopProfiling();
+
         // remove task
         profileTaskList.remove(needToStop.getTask());
 
-        // TODO notify OAP current profile task execute finish
+        // notify profiling task has finished
+        
ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class).notifyProfileTaskFinish(needToStop.getTask());
     }
 
     @Override
     public void prepare() throws Throwable {
-
     }
 
     @Override
     public void boot() throws Throwable {
-
     }
 
     @Override
     public void onComplete() throws Throwable {
-
+        // add trace finish notification
+        TracingContext.TracingThreadListenerManager.add(this);
     }
 
     @Override
     public void shutdown() throws Throwable {
+        // remove trace listener
+        TracingContext.TracingThreadListenerManager.remove(this);
+
         PROFILE_TASK_SCHEDULE.shutdown();
+
+        PROFILE_EXECUTOR.shutdown();
     }
 
     public long getLastCommandCreateTime() {
@@ -146,39 +187,37 @@ public class ProfileTaskExecutionService implements 
BootService {
 
     /**
      * check profile task data success, make the re-check, prevent receiving 
wrong data from database or OAP
-     * @param task
-     * @return
      */
-    private String checkProfileTaskSuccess(ProfileTask task) {
+    private CheckResult checkProfileTaskSuccess(ProfileTask task) {
         // endpoint name
-        if (StringUtil.isEmpty(task.getEndpointName())) {
-            return "endpoint name cannot be empty";
+        if (StringUtil.isEmpty(task.getFistSpanOPName())) {
+            return new CheckResult(false, "endpoint name cannot be empty");
         }
 
         // duration
         if (task.getDuration() < ProfileConstants.TASK_DURATION_MIN_MINUTE) {
-            return "monitor duration must greater than " + 
ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes";
+            return new CheckResult(false, "monitor duration must greater than 
" + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes");
         }
         if (task.getDuration() > ProfileConstants.TASK_DURATION_MAX_MINUTE) {
-            return "The duration of the monitoring task cannot be greater than 
" + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes";
+            return new CheckResult(false, "The duration of the monitoring task 
cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + " 
minutes");
         }
 
         // min duration threshold
         if (task.getMinDurationThreshold() < 0) {
-            return "min duration threshold must greater than or equals zero";
+            return new CheckResult(false, "min duration threshold must greater 
than or equals zero");
         }
 
         // dump period
         if (task.getThreadDumpPeriod() < 
ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS) {
-            return "dump period must be greater than or equals " + 
ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds";
+            return new CheckResult(false, "dump period must be greater than or 
equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds");
         }
 
         // max sampling count
         if (task.getMaxSamplingCount() <= 0) {
-            return "max sampling count must greater than zero";
+            return new CheckResult(false, "max sampling count must greater 
than zero");
         }
         if (task.getMaxSamplingCount() >= 
ProfileConstants.TASK_MAX_SAMPLING_COUNT) {
-            return "max sampling count must less than " + 
ProfileConstants.TASK_MAX_SAMPLING_COUNT;
+            return new CheckResult(false, "max sampling count must less than " 
+ ProfileConstants.TASK_MAX_SAMPLING_COUNT);
         }
 
         // check task queue, check only one task in a certain time
@@ -187,15 +226,46 @@ public class ProfileTaskExecutionService implements 
BootService {
 
             // if the end time of the task to be added is during the execution 
of any data, means is a error data
             if (taskProcessFinishTime >= profileTask.getStartTime() && 
taskProcessFinishTime <= calcProfileTaskFinishTime(profileTask)) {
-                return "there already have processing task in time range, 
could not add a new task again. processing task monitor endpoint name: " + 
profileTask.getEndpointName();
+                return new CheckResult(false, "there already have processing 
task in time range, could not add a new task again. processing task monitor 
endpoint name: " + profileTask.getFistSpanOPName());
             }
         }
 
-        return null;
+        return new CheckResult(true, null);
     }
 
     private long calcProfileTaskFinishTime(ProfileTask task) {
         return task.getStartTime() + 
TimeUnit.MINUTES.toMillis(task.getDuration());
     }
 
+    @Override
+    public void afterMainThreadFinish(TracingContext tracingContext) {
+        if (tracingContext.isProfiling()) {
+            // stop profiling tracing context
+            ProfileTaskExecutionContext currentExecutionContext = 
taskExecutionContext.get();
+            if (currentExecutionContext != null) {
+                currentExecutionContext.stopTracingProfile(tracingContext);
+            }
+        }
+    }
+
+    /**
+     * check profile task is processable
+     */
+    private static class CheckResult {
+        private boolean success;
+        private String errorReason;
+
+        public CheckResult(boolean success, String errorReason) {
+            this.success = success;
+            this.errorReason = errorReason;
+        }
+
+        public boolean isSuccess() {
+            return success;
+        }
+
+        public String getErrorReason() {
+            return errorReason;
+        }
+    }
 }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java
deleted file mode 100644
index 5cca040..0000000
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.agent.core.profile;
-
-import io.grpc.Channel;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
-import org.apache.skywalking.apm.agent.core.boot.BootService;
-import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
-import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
-import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
-import org.apache.skywalking.apm.agent.core.commands.CommandService;
-import org.apache.skywalking.apm.agent.core.conf.Config;
-import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
-import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
-import org.apache.skywalking.apm.agent.core.logging.api.ILog;
-import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
-import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
-import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
-import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
-import org.apache.skywalking.apm.network.common.Commands;
-import 
org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
-import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
-
-/**
- * sniffer will check has new profile task list every {@link 
Config.Collector#GET_PROFILE_TASK_INTERVAL} second.
- *
- * @author MrPro
- */
-@DefaultImplementor
-public class ProfileTaskQueryService implements BootService, Runnable, 
GRPCChannelListener {
-    private static final ILog logger = 
LogManager.getLogger(ProfileTaskQueryService.class);
-
-    private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
-    private volatile ProfileTaskGrpc.ProfileTaskBlockingStub 
profileTaskBlockingStub;
-    private volatile ScheduledFuture<?> getTaskListFuture;
-
-    @Override
-    public void run() {
-        if (RemoteDownstreamConfig.Agent.SERVICE_ID != 
DictionaryUtil.nullValue()
-                && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != 
DictionaryUtil.nullValue()
-        ) {
-            if (status == GRPCChannelStatus.CONNECTED) {
-                try {
-                    ProfileTaskCommandQuery.Builder builder = 
ProfileTaskCommandQuery.newBuilder();
-
-                    // sniffer info
-                    
builder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
-
-                    // last command create time
-                    
builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime());
-
-                    Commands commands = 
profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS).getProfileTaskCommands(builder.build());
-                    
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
-                } catch (Throwable t) {
-                    if (!(t instanceof StatusRuntimeException)) {
-                        logger.error(t, "query profile task from Collector 
fail.", t);
-                        return;
-                    }
-                    final StatusRuntimeException statusRuntimeException = 
(StatusRuntimeException) t;
-                    if (statusRuntimeException.getStatus().getCode() == 
Status.Code.UNIMPLEMENTED) {
-                        logger.warn("Backend doesn't support profiling, 
profiling will be disabled");
-                        if (getTaskListFuture != null) {
-                            getTaskListFuture.cancel(true);
-                        }
-                    }
-                }
-            }
-        }
-
-    }
-
-    @Override
-    public void prepare() throws Throwable {
-        
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
-    }
-
-    @Override
-    public void boot() throws Throwable {
-        if (Config.Profile.ACTIVE) {
-            getTaskListFuture = Executors.newSingleThreadScheduledExecutor(new 
DefaultNamedThreadFactory("ProfileGetTaskService"))
-                    .scheduleWithFixedDelay(this, 0, 
Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS);
-        }
-    }
-
-    @Override
-    public void onComplete() throws Throwable {
-    }
-
-    @Override
-    public void shutdown() throws Throwable {
-        if (getTaskListFuture != null) {
-            getTaskListFuture.cancel(true);
-        }
-    }
-
-    @Override
-    public void statusChanged(GRPCChannelStatus status) {
-        if (GRPCChannelStatus.CONNECTED.equals(status)) {
-            Channel channel = 
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
-            profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel);
-        } else {
-            profileTaskBlockingStub = null;
-        }
-        this.status = status;
-    }
-}
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java
new file mode 100644
index 0000000..086126f
--- /dev/null
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+/**
+ * Profile task process thread, dump the executing thread stack.
+ *
+ * @author MrPro
+ */
+public class ProfileThread implements Runnable {
+
+    private static final ILog logger = 
LogManager.getLogger(ProfileThread.class);
+
+    // profiling task context
+    private final ProfileTaskExecutionContext taskExecutionContext;
+
+    private final ProfileTaskExecutionService profileTaskExecutionService;
+    private final ProfileTaskChannelService profileTaskChannelService;
+
+    public ProfileThread(ProfileTaskExecutionContext taskExecutionContext) {
+        this.taskExecutionContext = taskExecutionContext;
+        profileTaskExecutionService = 
ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
+        profileTaskChannelService = 
ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class);
+    }
+
+    @Override
+    public void run() {
+
+        try {
+            profiling(taskExecutionContext);
+        } catch (InterruptedException e) {
+            // ignore interrupted
+            // means current task has stopped
+        } catch (Exception e) {
+            logger.error(e, "Profiling task fail. taskId:{}", 
taskExecutionContext.getTask().getTaskId());
+        } finally {
+            // finally stop current profiling task, tell execution service 
task has stop
+            
profileTaskExecutionService.stopCurrentProfileTask(taskExecutionContext);
+        }
+
+    }
+
+    /**
+     * start profiling
+     */
+    private void profiling(ProfileTaskExecutionContext executionContext) 
throws InterruptedException {
+
+        int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();
+
+        // run loop when current thread still running
+        long currentLoopStartTime = -1;
+        while (!Thread.currentThread().isInterrupted()) {
+            currentLoopStartTime = System.currentTimeMillis();
+
+            // each all slot
+            AtomicReferenceArray<ThreadProfiler> profilers = 
executionContext.threadProfilerSlots();
+            int profilerCount = profilers.length();
+            for (int slot = 0; slot < profilerCount; slot++) {
+                ThreadProfiler currentProfiler = profilers.get(slot);
+                if (currentProfiler == null) {
+                    continue;
+                }
+
+                switch (currentProfiler.profilingStatus()) {
+
+                    case READY:
+                        // check tracing context running time
+                        currentProfiler.startProfilingIfNeed();
+                        break;
+
+                    case PROFILING:
+                        // dump stack
+                        TracingThreadSnapshot snapshot = 
currentProfiler.buildSnapshot();
+                        if (snapshot != null) {
+                            
profileTaskChannelService.addProfilingSnapshot(snapshot);
+                        } else {
+                            // tell execution context current tracing thread 
dump failed, stop it
+                            
executionContext.stopTracingProfile(currentProfiler.tracingContext());
+                        }
+                        break;
+
+                }
+            }
+
+            // sleep to next period
+            // if out of period, sleep one period
+            long needToSleep = (currentLoopStartTime + maxSleepPeriod) - 
System.currentTimeMillis();
+            needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod;
+            Thread.sleep(needToSleep);
+        }
+    }
+
+}
diff --git 
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java
similarity index 59%
copy from 
test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
copy to 
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java
index c300fc3..5159c0c 100644
--- 
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java
@@ -16,30 +16,16 @@
  *
  */
 
-package org.apache.skywalking.e2e.profile;
-
-import org.springframework.web.bind.annotation.*;
+package org.apache.skywalking.apm.agent.core.profile;
 
 /**
  * @author MrPro
  */
-@RestController
-@RequestMapping("/e2e")
-public class TestController {
-    private final UserRepo userRepo;
+public enum ProfilingStatus {
 
-    public TestController(final UserRepo userRepo) {
-        this.userRepo = userRepo;
-    }
+    READY,
 
-    @GetMapping("/health-check")
-    public String hello() {
-        return "healthy";
-    }
+    PROFILING,
 
-    @PostMapping("/users")
-    public User createAuthor(@RequestBody final User user) throws 
InterruptedException {
-        Thread.sleep(1000L);
-        return userRepo.save(user);
-    }
+    STOPPED
 }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java
new file mode 100644
index 0000000..59b5e99
--- /dev/null
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import com.google.common.base.Objects;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
+import org.apache.skywalking.apm.agent.core.context.ids.ID;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author MrPro
+ */
+public class ThreadProfiler {
+
+    // current tracing context
+    private final TracingContext tracingContext;
+    // current tracing segment id
+    private final ID traceSegmentId;
+    // need to profiling thread
+    private final Thread profilingThread;
+    // profiling execution context
+    private final ProfileTaskExecutionContext executionContext;
+
+    // profiling time
+    private long profilingStartTime;
+    private long profilingMaxTimeMills;
+
+    // after min duration threshold check, it will start dump
+    private ProfilingStatus profilingStatus = ProfilingStatus.READY;
+    // thread dump sequence
+    private int dumpSequence = 0;
+
+    public ThreadProfiler(TracingContext tracingContext, ID traceSegmentId, 
Thread profilingThread, ProfileTaskExecutionContext executionContext) {
+        this.tracingContext = tracingContext;
+        this.traceSegmentId = traceSegmentId;
+        this.profilingThread = profilingThread;
+        this.executionContext = executionContext;
+        this.profilingMaxTimeMills = 
TimeUnit.MINUTES.toMillis(Config.Profile.MAX_DURATION);
+    }
+
+    /**
+     * If tracing start time greater than {@link 
ProfileTask#getMinDurationThreshold()}, then start to profiling trace
+     */
+    public void startProfilingIfNeed() {
+        if (System.currentTimeMillis() - tracingContext.createTime() > 
executionContext.getTask().getMinDurationThreshold()) {
+            this.profilingStartTime = System.currentTimeMillis();
+            this.profilingStatus = ProfilingStatus.PROFILING;
+        }
+    }
+
+    /**
+     * Stop profiling status
+     */
+    public void stopProfiling() {
+        this.profilingStatus = ProfilingStatus.STOPPED;
+    }
+
+    /**
+     * dump tracing thread and build thread snapshot
+     *
+     * @return snapshot, if null means dump snapshot error, should stop it
+     */
+    public TracingThreadSnapshot buildSnapshot() {
+        if (!isProfilingContinuable()) {
+            return null;
+        }
+
+        long currentTime = System.currentTimeMillis();
+        // dump thread
+        StackTraceElement[] stackTrace;
+        try {
+            stackTrace = profilingThread.getStackTrace();
+
+            // stack depth is zero, means thread is already run finished
+            if (stackTrace.length == 0) {
+                return null;
+            }
+        } catch (Exception e) {
+            // dump error ignore and make this profiler stop
+            return null;
+        }
+
+        // if is first dump, check is can start profiling
+        if (dumpSequence == 0 && (!executionContext.isStartProfileable())) {
+            return null;
+        }
+
+        int dumpElementCount = Math.min(stackTrace.length, 
Config.Profile.DUMP_MAX_STACK_DEPTH);
+
+        // use inverted order, because thread dump is start with bottom
+        final ArrayList<String> stackList = new ArrayList<>(dumpElementCount);
+        for (int i = dumpElementCount - 1; i >= 0; i--) {
+            stackList.add(buildStackElementCodeSignature(stackTrace[i]));
+        }
+
+        String taskId = executionContext.getTask().getTaskId();
+        return new TracingThreadSnapshot(taskId, traceSegmentId, 
dumpSequence++, currentTime, stackList);
+    }
+
+    /**
+     * build thread stack element code signature
+     *
+     * @return code sign: className.methodName:lineNumber
+     */
+    private String buildStackElementCodeSignature(StackTraceElement element) {
+        return element.getClassName() + "." + element.getMethodName() + ":" + 
element.getLineNumber();
+    }
+
+    /**
+     * matches profiling tracing context
+     */
+    public boolean matches(TracingContext context) {
+        // match trace id
+        return Objects.equal(context.getReadableGlobalTraceId(), 
tracingContext.getReadableGlobalTraceId());
+    }
+
+    /**
+     * check whether profiling should continue
+     *
+     * @return if true means this thread profiling is continuable
+     */
+    private boolean isProfilingContinuable() {
+        return System.currentTimeMillis() - profilingStartTime < 
profilingMaxTimeMills;
+    }
+
+    public TracingContext tracingContext() {
+        return tracingContext;
+    }
+
+    public ProfilingStatus profilingStatus() {
+        return profilingStatus;
+    }
+
+}
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java
new file mode 100644
index 0000000..dcfcf05
--- /dev/null
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import org.apache.skywalking.apm.agent.core.context.ids.ID;
+import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot;
+import org.apache.skywalking.apm.network.language.profile.ThreadStack;
+
+import java.util.List;
+
+/**
+ * @author MrPro
+ */
+public class TracingThreadSnapshot {
+
+    // thread profiler
+    private final String taskId;
+    private final ID traceSegmentId;
+
+    // dump info
+    private final int sequence;
+    private final long time;
+    private final List<String> stackList;
+
+    public TracingThreadSnapshot(String taskId, ID traceSegmentId, int 
sequence, long time, List<String> stackList) {
+        this.taskId = taskId;
+        this.traceSegmentId = traceSegmentId;
+        this.sequence = sequence;
+        this.time = time;
+        this.stackList = stackList;
+    }
+
+    /**
+     * transform to gRPC data
+     */
+    public ThreadSnapshot transform() {
+        final ThreadSnapshot.Builder builder = ThreadSnapshot.newBuilder();
+        // task id
+        builder.setTaskId(taskId);
+        // dumped segment id
+        builder.setTraceSegmentId(traceSegmentId.transform());
+        // dump time
+        builder.setTime(time);
+        // snapshot dump sequence
+        builder.setSequence(sequence);
+        // snapshot stack
+        final ThreadStack.Builder stackBuilder = ThreadStack.newBuilder();
+        for (String codeSign : stackList) {
+            stackBuilder.addCodeSignatures(codeSign);
+        }
+        builder.setStack(stackBuilder);
+
+        return builder.build();
+    }
+
+
+}
diff --git 
a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
 
b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
index 826b032..392600c 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
+++ 
b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
@@ -26,5 +26,5 @@ 
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
 org.apache.skywalking.apm.agent.core.commands.CommandService
 org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
 org.apache.skywalking.apm.agent.core.context.OperationNameFormatService
-org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService
+org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService
 org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
diff --git 
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
 
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
index 0390dcc..53b4d4d 100644
--- 
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
+++ 
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
@@ -22,13 +22,11 @@ package org.apache.skywalking.apm.agent.core.boot;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
-import org.apache.skywalking.apm.agent.core.context.ContextManager;
-import org.apache.skywalking.apm.agent.core.context.IgnoredTracerContext;
-import org.apache.skywalking.apm.agent.core.context.TracingContext;
-import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
+
+import org.apache.skywalking.apm.agent.core.context.*;
 import org.apache.skywalking.apm.agent.core.jvm.JVMService;
 import 
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
-import org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService;
+import org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService;
 import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
 import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
 import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
@@ -64,11 +62,17 @@ public class ServiceManagerTest {
         
assertGRPCChannelManager(ServiceManager.INSTANCE.findService(GRPCChannelManager.class));
         
assertSamplingService(ServiceManager.INSTANCE.findService(SamplingService.class));
         
assertJVMService(ServiceManager.INSTANCE.findService(JVMService.class));
-        
assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskQueryService.class));
+        
assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class));
         
assertProfileTaskExecuteService(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class));
 
         assertTracingContextListener();
         assertIgnoreTracingContextListener();
+        assertTracingThreadContextListener();
+    }
+
+    private void assertTracingThreadContextListener() throws Exception {
+        List<TracingThreadListener> listeners = 
getFieldValue(TracingContext.TracingThreadListenerManager.class, "LISTENERS");
+        assertThat(listeners.size(), is(1));
     }
 
     private void assertIgnoreTracingContextListener() throws Exception {
@@ -87,7 +91,7 @@ public class ServiceManagerTest {
         assertNotNull(service);
     }
 
-    private void assertProfileTaskQueryService(ProfileTaskQueryService 
service) {
+    private void assertProfileTaskQueryService(ProfileTaskChannelService 
service) {
         assertNotNull(service);
     }
 
diff --git 
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java
 
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java
index 0943107..9686324 100644
--- 
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java
+++ 
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java
@@ -46,7 +46,7 @@ public class TracingContextTest {
         };
         TracingContext.ListenerManager.add(listener);
         try {
-            TracingContext tracingContext = new TracingContext();
+            TracingContext tracingContext = new TracingContext("/url");
             AbstractSpan span = tracingContext.createEntrySpan("/url");
 
             for (int i = 0; i < 10; i++) {
diff --git 
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java
 
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java
index 1e4a987..bd2b368 100644
--- 
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java
+++ 
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.apm.agent.core.test.tools;
 
 import java.util.HashMap;
 import java.util.LinkedList;
+
+import org.apache.skywalking.apm.agent.core.context.TracingThreadListener;
 import org.junit.rules.ExternalResource;
 import org.powermock.reflect.Whitebox;
 import org.apache.skywalking.apm.agent.core.boot.BootService;
@@ -37,6 +39,7 @@ public class AgentServiceRule extends ExternalResource {
         Whitebox.setInternalState(ServiceManager.INSTANCE, "bootedServices", 
new HashMap<Class, BootService>());
         Whitebox.setInternalState(TracingContext.ListenerManager.class, 
"LISTENERS", new LinkedList<TracingContextListener>());
         Whitebox.setInternalState(IgnoredTracerContext.ListenerManager.class, 
"LISTENERS", new LinkedList<TracingContextListener>());
+        
Whitebox.setInternalState(TracingContext.TracingThreadListenerManager.class, 
"LISTENERS", new LinkedList<TracingThreadListener>());
     }
 
     @Override
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index acd369b..ee6dc5a 100644
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -44,6 +44,18 @@ agent.service_name=${SW_AGENT_NAME:Your_ApplicationName}
 # If true, skywalking agent will enable profile when user create a new profile 
task. Otherwise disable profile.
 # profile.active=${SW_AGENT_PROFILE_ACTIVE:true}
 
+# Parallel monitor segment count
+# profile.max_parallel=${SW_AGENT_PROFILE_MAX_PARALLEL:5}
+
+# Max monitor segment time(minutes), if current segment monitor time out of 
limit, then stop it.
+# profile.duration=${SW_AGENT_PROFILE_DURATION:10}
+
+# Max dump thread stack depth
+# profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500}
+
+# Snapshot transport to backend buffer size
+# 
profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:50}
+
 # Backend service addresses.
 
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
 
diff --git a/docs/en/setup/service-agent/java-agent/README.md 
b/docs/en/setup/service-agent/java-agent/README.md
index f38d67f..fc9946b 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -102,6 +102,10 @@ property key | Description | Default |
 `dictionary.service_code_buffer_size`|The buffer size of application codes and 
peer|`10 * 10000`|
 `dictionary.endpoint_name_buffer_size`|The buffer size of endpoint names and 
peer|`1000 * 10000`|
 `profile.active`|If true, skywalking agent will enable profile when user 
create a new profile task. Otherwise disable profile.|`true`|
+`profile.max_parallel`|Parallel monitor segment count|`5`|
+`profile.duration`|Max monitor segment time(minutes), if current segment 
monitor time out of limit, then stop it.|`10`|
+`profile.dump_max_stack_depth`|Max dump thread stack depth|`500`|
+`profile.snapshot_transport_buffer_size`|Snapshot transport to backend buffer 
size|`50`|
 `plugin.peer_max_length `|Peer maximum description limit.|`200`|
 `plugin.mongodb.trace_param`|If true, trace all the parameters in MongoDB 
access, default is false. Only trace the operation, not include 
parameters.|`false`|
 `plugin.mongodb.filter_length_limit`|If set to positive number, the 
`WriteRequest.params` would be truncated to this length, otherwise it would be 
completely saved, which may cause performance problem.|`256`|
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml 
b/oap-server/server-bootstrap/src/main/resources/application.yml
index 3f79711..faaa7e3 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -164,6 +164,8 @@ istio-telemetry:
   default:
 envoy-metric:
   default:
+receiver-profile:
+  default:
 #    alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh}
 #receiver_zipkin:
 #  default:
@@ -233,3 +235,4 @@ configuration:
 #  grpc:
 #    targetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
 #    targetPort: ${SW_EXPORTER_GRPC_PORT:9870}
+
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
index b790fbc..22f6172 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
@@ -23,14 +23,18 @@ import com.google.common.cache.CacheBuilder;
 import org.apache.skywalking.oap.server.core.CoreModuleConfig;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import 
org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.module.Service;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -42,9 +46,11 @@ public class ProfileTaskCache implements Service {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProfileTaskCache.class);
 
-    private final Cache<Integer, List<ProfileTask>> profileTaskCache;
+    private final Cache<Integer, List<ProfileTask>> profileTaskDownstreamCache;
+    private final Cache<String, ProfileTask> profileTaskIdCache;
 
     private final ModuleManager moduleManager;
+    private IProfileTaskQueryDAO profileTaskQueryDAO;
 
     public ProfileTaskCache(ModuleManager moduleManager, CoreModuleConfig 
moduleConfig) {
         this.moduleManager = moduleManager;
@@ -52,9 +58,18 @@ public class ProfileTaskCache implements Service {
         long initialSize = moduleConfig.getMaxSizeOfProfileTask() / 10L;
         int initialCapacitySize = (int)(initialSize > Integer.MAX_VALUE ? 
Integer.MAX_VALUE : initialSize);
 
-        profileTaskCache = 
CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask())
+        profileTaskDownstreamCache = 
CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask())
                 // remove old profile task data
                 .expireAfterWrite(Duration.ofMinutes(1)).build();
+
+        profileTaskIdCache = 
CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask()).build();
+    }
+
+    private IProfileTaskQueryDAO getProfileTaskQueryDAO() {
+        if (Objects.isNull(profileTaskQueryDAO)) {
+            profileTaskQueryDAO = 
moduleManager.find(StorageModule.NAME).provider().getService(IProfileTaskQueryDAO.class);
+        }
+        return profileTaskQueryDAO;
     }
 
     /**
@@ -64,11 +79,33 @@ public class ProfileTaskCache implements Service {
      */
     public List<ProfileTask> getProfileTaskList(int serviceId) {
         // read profile task list from cache only, use cache update timer 
mechanism
-        List<ProfileTask> profileTaskList = 
profileTaskCache.getIfPresent(serviceId);
+        List<ProfileTask> profileTaskList = 
profileTaskDownstreamCache.getIfPresent(serviceId);
         return profileTaskList;
     }
 
     /**
+     * query profile task by id
+     * @param id
+     * @return
+     */
+    public ProfileTask getProfileTaskById(String id) {
+        ProfileTask profile = profileTaskIdCache.getIfPresent(id);
+
+        if (profile == null) {
+            try {
+                profile = getProfileTaskQueryDAO().getById(id);
+            } catch (IOException e) {
+                LOGGER.error(e.getMessage(), e);
+            }
+            if (profile != null) {
+                profileTaskIdCache.put(id, profile);
+            }
+        }
+
+        return profile;
+    }
+
+    /**
      * save service task list
      * @param serviceId
      * @param taskList
@@ -78,7 +115,7 @@ public class ProfileTaskCache implements Service {
             taskList = Collections.emptyList();
         }
 
-        profileTaskCache.put(serviceId, taskList);
+        profileTaskDownstreamCache.put(serviceId, taskList);
     }
 
     /**
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
index cb6fa43..c152d83 100755
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
@@ -43,7 +43,7 @@ public class CommandService implements Service {
 
     public ProfileTaskCommand newProfileTaskCommand(ProfileTask task) {
         final String serialNumber = UUID.randomUUID().toString();
-        return new ProfileTaskCommand(serialNumber, task.getEndpointName(), 
task.getDuration(), task.getMinDurationThreshold(), task.getDumpPeriod(), 
task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime());
+        return new ProfileTaskCommand(serialNumber, task.getId(), 
task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(), 
task.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(), 
task.getCreateTime());
     }
 
     private String generateSerialNumber(final int serviceInstanceId, final 
long time, final String serviceInstanceUUID) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java
new file mode 100644
index 0000000..36a4cba
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.oap.server.core.profile;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import 
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT;
+
+/**
+ * Profiling segment snapshot database bean, use record
+ *
+ * @author MrPro
+ */
+@Getter
+@Setter
+@ScopeDeclaration(id = PROFILE_TASK_SEGMENT_SNAPSHOT, name = 
"ProfileTaskSegmentSnapshot")
+@Stream(name = ProfileTaskSegmentSnapshotRecord.INDEX_NAME, scopeId = 
PROFILE_TASK_SEGMENT_SNAPSHOT, builder = 
ProfileTaskSegmentSnapshotRecord.Builder.class, processor = 
RecordStreamProcessor.class)
+public class ProfileTaskSegmentSnapshotRecord extends Record {
+
+    public static final String INDEX_NAME = "profile_task_segment_snapshot";
+    public static final String TASK_ID = "task_id";
+    public static final String SEGMENT_ID = "segment_id";
+    public static final String DUMP_TIME = "dump_time";
+    public static final String SEQUENCE = "sequence";
+    public static final String STACK_BINARY = "stack_binary";
+
+    @Column(columnName = TASK_ID) private String taskId;
+    @Column(columnName = SEGMENT_ID) private String segmentId;
+    @Column(columnName = DUMP_TIME) private long dumpTime;
+    @Column(columnName = SEQUENCE) private int sequence;
+    @Column(columnName = STACK_BINARY) private byte[] stackBinary;
+
+    @Override
+    public String id() {
+        return getTaskId() + Const.ID_SPLIT + getSegmentId() + Const.ID_SPLIT 
+ getSequence() + Const.ID_SPLIT;
+    }
+
+    public static class Builder implements 
StorageBuilder<ProfileTaskSegmentSnapshotRecord> {
+
+        @Override
+        public ProfileTaskSegmentSnapshotRecord map2Data(Map<String, Object> 
dbMap) {
+            final ProfileTaskSegmentSnapshotRecord snapshot = new 
ProfileTaskSegmentSnapshotRecord();
+            snapshot.setTaskId((String)dbMap.get(TASK_ID));
+            snapshot.setSegmentId((String)dbMap.get(SEGMENT_ID));
+            snapshot.setDumpTime(((Number)dbMap.get(DUMP_TIME)).longValue());
+            snapshot.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
+            
snapshot.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).intValue());
+            if (StringUtil.isEmpty((String)dbMap.get(STACK_BINARY))) {
+                snapshot.setStackBinary(new byte[] {});
+            } else {
+                
snapshot.setStackBinary(Base64.getDecoder().decode((String)dbMap.get(STACK_BINARY)));
+            }
+            return snapshot;
+        }
+
+        @Override
+        public Map<String, Object> data2Map(ProfileTaskSegmentSnapshotRecord 
storageData) {
+            final HashMap<String, Object> map = new HashMap<>();
+            map.put(TASK_ID, storageData.getTaskId());
+            map.put(SEGMENT_ID, storageData.getSegmentId());
+            map.put(DUMP_TIME, storageData.getDumpTime());
+            map.put(SEQUENCE, storageData.getSequence());
+            map.put(TIME_BUCKET, storageData.getTimeBucket());
+            if (CollectionUtils.isEmpty(storageData.getStackBinary())) {
+                map.put(STACK_BINARY, Const.EMPTY_STRING);
+            } else {
+                map.put(STACK_BINARY, new 
String(Base64.getEncoder().encode(storageData.getStackBinary())));
+            }
+            return map;
+        }
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 54f9dfc..c6741b5 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -69,6 +69,7 @@ public class DefaultScopeDefine {
     public static final int HTTP_ACCESS_LOG = 25;
     public static final int PROFILE_TASK = 26;
     public static final int PROFILE_TASK_LOG = 27;
+    public static final int PROFILE_TASK_SEGMENT_SNAPSHOT = 28;
 
     /**
      * Catalog of scope, the metrics processor could use this to group all 
generated metrics by oal rt.
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java
index 5e6fa09..7ca639d 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java
@@ -41,4 +41,11 @@ public interface IProfileTaskQueryDAO extends DAO {
      */
     List<ProfileTask> getTaskList(final Integer serviceId, final String 
endpointName, final Long startTimeBucket, final Long endTimeBucket, final 
Integer limit) throws IOException;
 
+    /**
+     * query profile task by id
+     * @param id
+     * @return
+     */
+    ProfileTask getById(final String id) throws IOException;
+
 }
diff --git 
a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
 
b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
index 0511808..b181fd3 100644
--- 
a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
+++ 
b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
@@ -20,19 +20,25 @@ package 
org.apache.skywalking.oap.server.receiver.profile.provider.handler;
 
 import io.grpc.stub.StreamObserver;
 import org.apache.skywalking.apm.network.common.Commands;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
 import 
org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
+import 
org.apache.skywalking.apm.network.language.profile.ProfileTaskFinishReport;
 import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
+import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import 
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
 import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
 import org.apache.skywalking.oap.server.core.command.CommandService;
 import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import 
org.apache.skywalking.oap.server.core.profile.ProfileTaskSegmentSnapshotRecord;
 import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
 import 
org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
 import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +48,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class ProfileTaskServiceHandler extends 
ProfileTaskGrpc.ProfileTaskImplBase implements GRPCHandler {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProfileTaskServiceHandler.class);
+
     private ProfileTaskCache profileTaskCache;
     private final CommandService commandService;
 
@@ -71,7 +79,7 @@ public class ProfileTaskServiceHandler extends 
ProfileTaskGrpc.ProfileTaskImplBa
             }
 
             // record profile task log
-            recordProfileTaskLog(profileTask, request);
+            recordProfileTaskLog(profileTask, request.getInstanceId(), 
ProfileTaskLogOperationType.NOTIFIED);
 
             // add command
             
commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());
@@ -81,11 +89,72 @@ public class ProfileTaskServiceHandler extends 
ProfileTaskGrpc.ProfileTaskImplBa
         responseObserver.onCompleted();
     }
 
-    private void recordProfileTaskLog(ProfileTask task, 
ProfileTaskCommandQuery query) {
+    @Override
+    public StreamObserver<ThreadSnapshot> 
collectSnapshot(StreamObserver<Commands> responseObserver) {
+        return new StreamObserver<ThreadSnapshot>() {
+            @Override
+            public void onNext(ThreadSnapshot snapshot) {
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug("receive profile segment snapshot");
+                }
+
+                // parse segment id
+                UniqueId uniqueId = snapshot.getTraceSegmentId();
+                StringBuilder segmentIdBuilder = new StringBuilder();
+                for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) {
+                    if (i == 0) {
+                        
segmentIdBuilder.append(uniqueId.getIdPartsList().get(i));
+                    } else {
+                        
segmentIdBuilder.append(".").append(uniqueId.getIdPartsList().get(i));
+                    }
+                }
+
+                // build database data
+                final ProfileTaskSegmentSnapshotRecord record = new 
ProfileTaskSegmentSnapshotRecord();
+                record.setTaskId(snapshot.getTaskId());
+                record.setSegmentId(segmentIdBuilder.toString());
+                record.setDumpTime(snapshot.getTime());
+                record.setSequence(snapshot.getSequence());
+                record.setStackBinary(snapshot.getStack().toByteArray());
+                
record.setTimeBucket(TimeBucket.getRecordTimeBucket(snapshot.getTime()));
+
+                // async storage
+                RecordStreamProcessor.getInstance().in(record);
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error(throwable.getMessage(), throwable);
+                responseObserver.onCompleted();
+            }
+
+            @Override
+            public void onCompleted() {
+                responseObserver.onNext(Commands.newBuilder().build());
+                responseObserver.onCompleted();
+            }
+        };
+    }
+
+    @Override
+    public void reportTaskFinish(ProfileTaskFinishReport request, 
StreamObserver<Commands> responseObserver) {
+        // query task from cache, set log time bucket need it
+        final ProfileTask profileTask = 
profileTaskCache.getProfileTaskById(request.getTaskId());
+
+        // record finish log
+        if (profileTask != null) {
+            recordProfileTaskLog(profileTask, request.getInstanceId(), 
ProfileTaskLogOperationType.EXECUTION_FINISHED);
+        }
+
+        responseObserver.onNext(Commands.newBuilder().build());
+        responseObserver.onCompleted();
+    }
+
+    private void recordProfileTaskLog(ProfileTask task, int instanceId, 
ProfileTaskLogOperationType operationType) {
         final ProfileTaskLogRecord logRecord = new ProfileTaskLogRecord();
         logRecord.setTaskId(task.getId());
-        logRecord.setInstanceId(query.getInstanceId());
-        
logRecord.setOperationType(ProfileTaskLogOperationType.NOTIFIED.getCode());
+        logRecord.setInstanceId(instanceId);
+        logRecord.setOperationType(operationType.getCode());
         logRecord.setOperationTime(System.currentTimeMillis());
         // same with task time bucket, ensure record will ttl same with 
profile task
         
logRecord.setTimeBucket(TimeBucket.getRecordTimeBucket(task.getStartTime() + 
TimeUnit.MINUTES.toMillis(task.getDuration())));
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
index d7598cf..dbd2dee 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
@@ -88,6 +88,25 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements 
IProfileTaskQueryDAO
         return tasks;
     }
 
+    @Override
+    public ProfileTask getById(String id) throws IOException {
+        if (StringUtil.isEmpty(id)) {
+            return null;
+        }
+
+        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+        sourceBuilder.query(QueryBuilders.idsQuery().addIds(id));
+        sourceBuilder.size(1);
+
+        final SearchResponse response = 
getClient().search(ProfileTaskNoneStream.INDEX_NAME, sourceBuilder);
+
+        if (response.getHits().getHits().length > 0) {
+            return parseTask(response.getHits().getHits()[0]);
+        }
+
+        return null;
+    }
+
     private ProfileTask parseTask(SearchHit data) {
         return ProfileTask.builder()
                 .id(data.getId())
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
index a526b49..248ca70 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
@@ -22,6 +22,7 @@ import org.apache.skywalking.apm.util.StringUtil;
 import org.apache.skywalking.oap.server.core.profile.ProfileTaskNoneStream;
 import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
 import 
org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import 
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
 
 import java.io.IOException;
@@ -87,6 +88,29 @@ public class H2ProfileTaskQueryDAO implements 
IProfileTaskQueryDAO {
         }
     }
 
+    @Override
+    public ProfileTask getById(String id) throws IOException {
+        if (StringUtil.isEmpty(id)) {
+            return null;
+        }
+
+        final StringBuilder sql = new StringBuilder();
+        final ArrayList<Object> condition = new ArrayList<>(1);
+        sql.append("select * from 
").append(ProfileTaskNoneStream.INDEX_NAME).append(" where id=? LIMIT 1");
+        condition.add(id);
+
+        try (Connection connection = h2Client.getConnection()) {
+            try (ResultSet resultSet = h2Client.executeQuery(connection, 
sql.toString(), condition.toArray(new Object[0]))) {
+                if (resultSet.next()) {
+                    return parseTask(resultSet);
+                }
+            }
+        } catch (SQLException | JDBCClientException e) {
+            throw new IOException(e);
+        }
+        return null;
+    }
+
     /**
      * parse profile task data
      * @param data
diff --git 
a/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
 
b/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
index 528c490..b998f4c 100644
--- 
a/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
+++ 
b/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
@@ -91,13 +91,7 @@ public class ProfileVerificationITCase {
 
         while (true) {
             try {
-                final Map<String, String> user = new HashMap<>();
-                user.put("name", "SkyWalking");
-                final ResponseEntity<String> responseEntity = 
restTemplate.postForEntity(
-                        instrumentedServiceUrl + "/e2e/users",
-                        user,
-                        String.class
-                );
+                final ResponseEntity<String> responseEntity = 
sendRequest(false);
                 LOGGER.info("responseEntity: {}", responseEntity);
                 
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
                 final List<Trace> traces = profileClient.traces(
@@ -123,6 +117,17 @@ public class ProfileVerificationITCase {
 
     }
 
+    private ResponseEntity<String> sendRequest(boolean needProfiling) {
+        final Map<String, String> user = new HashMap<>();
+        user.put("name", "SkyWalking");
+        user.put("enableProfiling", String.valueOf(needProfiling));
+        return restTemplate.postForEntity(
+                instrumentedServiceUrl + "/e2e/users",
+                user,
+                String.class
+        );
+    }
+
     /**
      * verify create profile task
      * @param minutesAgo
@@ -134,10 +139,10 @@ public class ProfileVerificationITCase {
         final ProfileTaskCreationRequest creationRequest = 
ProfileTaskCreationRequest.builder()
                 .serviceId(2)
                 .endpointName("/e2e/users")
-                .duration(5)
+                .duration(1)
                 .startTime(-1)
-                .minDurationThreshold(10)
-                .dumpPeriod(10)
+                .minDurationThreshold(1000)
+                .dumpPeriod(50)
                 .maxSamplingCount(5).build();
 
         // verify create task
@@ -147,18 +152,29 @@ public class ProfileVerificationITCase {
         ProfileTaskCreationResultMatcher creationResultMatcher = new 
ProfileTaskCreationResultMatcher();
         creationResultMatcher.verify(creationResult);
 
+        // verify get task list and sniffer get task logs
+        verifyProfileTask(creationRequest.getServiceId(), 
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml");
+
+        // send a profile request
+        sendRequest(true);
+
+        // verify task execution finish
+        verifyProfileTask(creationRequest.getServiceId(), 
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml");
+    }
+
+    private void verifyProfileTask(int serviceId, String verifyResources) 
throws InterruptedException {
         // verify get task list and logs
         for (int i = 0; i < 10; i++) {
             try {
                 final ProfileTasks tasks = profileClient.getProfileTaskList(
                         new ProfileTaskQuery()
-                                .serviceId(creationRequest.getServiceId())
+                                .serviceId(serviceId)
                                 .endpointName("")
                 );
                 LOGGER.info("get profile task list: {}", tasks);
 
                 InputStream expectedInputStream =
-                        new 
ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+                        new 
ClassPathResource(verifyResources).getInputStream();
 
                 final ProfilesTasksMatcher servicesMatcher = new 
Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
                 servicesMatcher.verify(tasks);
diff --git 
a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
 
b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
index cab6dff..2289ae8 100644
--- 
a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
+++ 
b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
@@ -91,13 +91,7 @@ public class ProfileVerificationITCase {
 
         while (true) {
             try {
-                final Map<String, String> user = new HashMap<>();
-                user.put("name", "SkyWalking");
-                final ResponseEntity<String> responseEntity = 
restTemplate.postForEntity(
-                        instrumentedServiceUrl + "/e2e/users",
-                        user,
-                        String.class
-                );
+                final ResponseEntity<String> responseEntity = 
sendRequest(false);
                 LOGGER.info("responseEntity: {}", responseEntity);
                 
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
                 final List<Trace> traces = profileClient.traces(
@@ -122,6 +116,17 @@ public class ProfileVerificationITCase {
 
     }
 
+    private ResponseEntity<String> sendRequest(boolean needProfiling) {
+        final Map<String, String> user = new HashMap<>();
+        user.put("name", "SkyWalking");
+        user.put("enableProfiling", String.valueOf(needProfiling));
+        return restTemplate.postForEntity(
+                instrumentedServiceUrl + "/e2e/users",
+                user,
+                String.class
+        );
+    }
+
     /**
      * verify create profile task
      * @param minutesAgo
@@ -133,10 +138,10 @@ public class ProfileVerificationITCase {
         final ProfileTaskCreationRequest creationRequest = 
ProfileTaskCreationRequest.builder()
                 .serviceId(2)
                 .endpointName("/e2e/users")
-                .duration(5)
+                .duration(1)
                 .startTime(-1)
-                .minDurationThreshold(10)
-                .dumpPeriod(10)
+                .minDurationThreshold(1000)
+                .dumpPeriod(50)
                 .maxSamplingCount(5).build();
 
         // verify create task
@@ -146,18 +151,29 @@ public class ProfileVerificationITCase {
         ProfileTaskCreationResultMatcher creationResultMatcher = new 
ProfileTaskCreationResultMatcher();
         creationResultMatcher.verify(creationResult);
 
+        // verify get task list and sniffer get task logs
+        verifyProfileTask(creationRequest.getServiceId(), 
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml");
+
+        // send a profile request
+        sendRequest(true);
+
+        // verify task execution finish
+        verifyProfileTask(creationRequest.getServiceId(), 
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml");
+    }
+
+    private void verifyProfileTask(int serviceId, String verifyResources) 
throws InterruptedException {
         // verify get task list and logs
         for (int i = 0; i < 10; i++) {
             try {
                 final ProfileTasks tasks = profileClient.getProfileTaskList(
                         new ProfileTaskQuery()
-                                .serviceId(creationRequest.getServiceId())
+                                .serviceId(serviceId)
                                 .endpointName("")
                 );
                 LOGGER.info("get profile task list: {}", tasks);
 
                 InputStream expectedInputStream =
-                        new 
ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+                        new 
ClassPathResource(verifyResources).getInputStream();
 
                 final ProfilesTasksMatcher servicesMatcher = new 
Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
                 servicesMatcher.verify(tasks);
diff --git 
a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
 
b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
index cab6dff..2289ae8 100644
--- 
a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
+++ 
b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
@@ -91,13 +91,7 @@ public class ProfileVerificationITCase {
 
         while (true) {
             try {
-                final Map<String, String> user = new HashMap<>();
-                user.put("name", "SkyWalking");
-                final ResponseEntity<String> responseEntity = 
restTemplate.postForEntity(
-                        instrumentedServiceUrl + "/e2e/users",
-                        user,
-                        String.class
-                );
+                final ResponseEntity<String> responseEntity = 
sendRequest(false);
                 LOGGER.info("responseEntity: {}", responseEntity);
                 
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
                 final List<Trace> traces = profileClient.traces(
@@ -122,6 +116,17 @@ public class ProfileVerificationITCase {
 
     }
 
+    private ResponseEntity<String> sendRequest(boolean needProfiling) {
+        final Map<String, String> user = new HashMap<>();
+        user.put("name", "SkyWalking");
+        user.put("enableProfiling", String.valueOf(needProfiling));
+        return restTemplate.postForEntity(
+                instrumentedServiceUrl + "/e2e/users",
+                user,
+                String.class
+        );
+    }
+
     /**
      * verify create profile task
      * @param minutesAgo
@@ -133,10 +138,10 @@ public class ProfileVerificationITCase {
         final ProfileTaskCreationRequest creationRequest = 
ProfileTaskCreationRequest.builder()
                 .serviceId(2)
                 .endpointName("/e2e/users")
-                .duration(5)
+                .duration(1)
                 .startTime(-1)
-                .minDurationThreshold(10)
-                .dumpPeriod(10)
+                .minDurationThreshold(1000)
+                .dumpPeriod(50)
                 .maxSamplingCount(5).build();
 
         // verify create task
@@ -146,18 +151,29 @@ public class ProfileVerificationITCase {
         ProfileTaskCreationResultMatcher creationResultMatcher = new 
ProfileTaskCreationResultMatcher();
         creationResultMatcher.verify(creationResult);
 
+        // verify get task list and sniffer get task logs
+        verifyProfileTask(creationRequest.getServiceId(), 
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml");
+
+        // send a profile request
+        sendRequest(true);
+
+        // verify task execution finish
+        verifyProfileTask(creationRequest.getServiceId(), 
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml");
+    }
+
+    private void verifyProfileTask(int serviceId, String verifyResources) 
throws InterruptedException {
         // verify get task list and logs
         for (int i = 0; i < 10; i++) {
             try {
                 final ProfileTasks tasks = profileClient.getProfileTaskList(
                         new ProfileTaskQuery()
-                                .serviceId(creationRequest.getServiceId())
+                                .serviceId(serviceId)
                                 .endpointName("")
                 );
                 LOGGER.info("get profile task list: {}", tasks);
 
                 InputStream expectedInputStream =
-                        new 
ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+                        new 
ClassPathResource(verifyResources).getInputStream();
 
                 final ProfilesTasksMatcher servicesMatcher = new 
Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
                 servicesMatcher.verify(tasks);
diff --git 
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
 
b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java
similarity index 63%
copy from 
test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
copy to 
test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java
index c300fc3..158f013 100644
--- 
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
+++ 
b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java
@@ -18,28 +18,34 @@
 
 package org.apache.skywalking.e2e.profile;
 
-import org.springframework.web.bind.annotation.*;
-
 /**
  * @author MrPro
  */
-@RestController
-@RequestMapping("/e2e")
-public class TestController {
-    private final UserRepo userRepo;
+public class CreateUser {
+
+    private String name;
+
+    private boolean enableProfiling;
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
 
-    public TestController(final UserRepo userRepo) {
-        this.userRepo = userRepo;
+    public boolean getEnableProfiling() {
+        return enableProfiling;
     }
 
-    @GetMapping("/health-check")
-    public String hello() {
-        return "healthy";
+    public void setEnableProfiling(boolean enableProfiling) {
+        this.enableProfiling = enableProfiling;
     }
 
-    @PostMapping("/users")
-    public User createAuthor(@RequestBody final User user) throws 
InterruptedException {
-        Thread.sleep(1000L);
-        return userRepo.save(user);
+    public User toUser() {
+        final User user = new User();
+        user.setName(name);
+        return user;
     }
 }
diff --git 
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
 
b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
index c300fc3..7cb6fac 100644
--- 
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
+++ 
b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
@@ -20,6 +20,8 @@ package org.apache.skywalking.e2e.profile;
 
 import org.springframework.web.bind.annotation.*;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * @author MrPro
  */
@@ -38,8 +40,14 @@ public class TestController {
     }
 
     @PostMapping("/users")
-    public User createAuthor(@RequestBody final User user) throws 
InterruptedException {
-        Thread.sleep(1000L);
-        return userRepo.save(user);
+    public User createAuthor(@RequestBody final CreateUser createUser) throws 
InterruptedException {
+        final User user = userRepo.save(createUser.toUser());
+        if (!createUser.getEnableProfiling()) {
+            return user;
+        } else {
+            // sleep 10 second
+            TimeUnit.SECONDS.sleep(10);
+            return user;
+        }
     }
 }
diff --git 
a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
 
b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml
similarity index 90%
copy from 
test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
copy to 
test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml
index 2aac5b2..03274e3 100644
--- 
a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
+++ 
b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml
@@ -26,5 +26,10 @@ tasks:
     logs:
       - id: not null
         instanceId: gt 0
+        operationType: EXECUTION_FINISHED
+        operationTime: gt 0
+      - id: not null
+        instanceId: gt 0
         operationType: NOTIFIED
         operationTime: gt 0
+
diff --git 
a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
 
b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml
similarity index 100%
rename from 
test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
rename to 
test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml

Reply via email to