This is an automated email from the ASF dual-hosted git repository.

chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a9f2b0ae7a2 Revert PR#30201 and remove ProcessIdContext and pass 
processId when execute sql (#30204)
a9f2b0ae7a2 is described below

commit a9f2b0ae7a243bfbcb114d427b4dd4206abc6196
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Tue Feb 20 13:00:09 2024 +0800

    Revert PR#30201 and remove ProcessIdContext and pass processId when execute 
sql (#30204)
    
    * Revert "Adjust jdbc processId to statement level to avoid monitor block 
when run benchmarksql (#30201)"
    
    This reverts commit 1ccc9ac6551741aeb0e806311ca48f9f5e737a67.
    
    * Remove ProcessIdContext and pass processId when execute sql
---
 .../infra/executor/kernel/ExecutorEngine.java      | 32 ++++++-----
 .../executor/kernel/model/ExecutorCallback.java    |  3 +-
 .../engine/driver/jdbc/JDBCExecutorCallback.java   |  8 +--
 .../sql/execute/engine/raw/RawExecutor.java        |  2 +-
 .../raw/callback/RawSQLExecutorCallback.java       |  9 +--
 .../infra/executor/sql/process/ProcessEngine.java  | 22 +++----
 .../executor/sql/process/ProcessIdContext.java     | 66 ---------------------
 .../kernel/fixture/ExecutorCallbackFixture.java    |  2 +-
 .../engine/jdbc/JDBCExecutorCallbackTest.java      | 10 +++-
 .../executor/sql/process/ProcessEngineTest.java    |  4 +-
 .../executor/sql/process/ProcessIdContextTest.java | 67 ----------------------
 .../driver/executor/DriverJDBCExecutor.java        |  6 +-
 .../batch/BatchPreparedStatementExecutor.java      |  5 +-
 .../jdbc/adapter/AbstractStatementAdapter.java     |  6 --
 .../core/connection/ShardingSphereConnection.java  |  9 +++
 .../statement/ShardingSpherePreparedStatement.java | 16 ++----
 .../core/statement/ShardingSphereStatement.java    | 12 ++--
 .../statement/CircuitBreakerPreparedStatement.java |  5 --
 .../batch/BatchPreparedStatementExecutorTest.java  |  5 +-
 .../executor/SQLFederationExecutorContext.java     |  2 +
 .../enumerable/EnumerableScanExecutor.java         |  5 +-
 .../proxy/backend/connector/DatabaseConnector.java |  2 +-
 .../connector/jdbc/executor/ProxyJDBCExecutor.java |  2 +-
 .../handler/distsql/rul/PreviewExecutor.java       |  6 +-
 .../OpenGaussSystemCatalogAdminQueryExecutor.java  |  3 +-
 25 files changed, 86 insertions(+), 223 deletions(-)

diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
index 2e5f2c7e849..3703e95d1ec 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
@@ -18,11 +18,11 @@
 package org.apache.shardingsphere.infra.executor.kernel;
 
 import lombok.Getter;
+import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnknownSQLException;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnknownSQLException;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -72,39 +72,41 @@ public final class ExecutorEngine implements AutoCloseable {
         if (executionGroupContext.getInputGroups().isEmpty()) {
             return Collections.emptyList();
         }
-        return serial ? 
serialExecute(executionGroupContext.getInputGroups().iterator(), firstCallback, 
callback)
-                : 
parallelExecute(executionGroupContext.getInputGroups().iterator(), 
firstCallback, callback);
+        return serial ? 
serialExecute(executionGroupContext.getInputGroups().iterator(), 
executionGroupContext.getReportContext().getProcessId(), firstCallback, 
callback)
+                : 
parallelExecute(executionGroupContext.getInputGroups().iterator(), 
executionGroupContext.getReportContext().getProcessId(), firstCallback, 
callback);
     }
     
-    private <I, O> List<O> serialExecute(final Iterator<ExecutionGroup<I>> 
executionGroups, final ExecutorCallback<I, O> firstCallback, final 
ExecutorCallback<I, O> callback) throws SQLException {
+    private <I, O> List<O> serialExecute(final Iterator<ExecutionGroup<I>> 
executionGroups, final String processId, final ExecutorCallback<I, O> 
firstCallback,
+                                         final ExecutorCallback<I, O> 
callback) throws SQLException {
         ExecutionGroup<I> firstInputs = executionGroups.next();
-        List<O> result = new LinkedList<>(syncExecute(firstInputs, null == 
firstCallback ? callback : firstCallback));
+        List<O> result = new LinkedList<>(syncExecute(firstInputs, processId, 
null == firstCallback ? callback : firstCallback));
         while (executionGroups.hasNext()) {
-            result.addAll(syncExecute(executionGroups.next(), callback));
+            result.addAll(syncExecute(executionGroups.next(), processId, 
callback));
         }
         return result;
     }
     
-    private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> 
executionGroups, final ExecutorCallback<I, O> firstCallback, final 
ExecutorCallback<I, O> callback) throws SQLException {
+    private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>> 
executionGroups, final String processId, final ExecutorCallback<I, O> 
firstCallback,
+                                           final ExecutorCallback<I, O> 
callback) throws SQLException {
         ExecutionGroup<I> firstInputs = executionGroups.next();
-        Collection<Future<Collection<O>>> restResultFutures = 
asyncExecute(executionGroups, callback);
-        return getGroupResults(syncExecute(firstInputs, null == firstCallback 
? callback : firstCallback), restResultFutures);
+        Collection<Future<Collection<O>>> restResultFutures = 
asyncExecute(executionGroups, processId, callback);
+        return getGroupResults(syncExecute(firstInputs, processId, null == 
firstCallback ? callback : firstCallback), restResultFutures);
     }
     
-    private <I, O> Collection<O> syncExecute(final ExecutionGroup<I> 
executionGroup, final ExecutorCallback<I, O> callback) throws SQLException {
-        return callback.execute(executionGroup.getInputs(), true);
+    private <I, O> Collection<O> syncExecute(final ExecutionGroup<I> 
executionGroup, final String processId, final ExecutorCallback<I, O> callback) 
throws SQLException {
+        return callback.execute(executionGroup.getInputs(), true, processId);
     }
     
-    private <I, O> Collection<Future<Collection<O>>> asyncExecute(final 
Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O> 
callback) {
+    private <I, O> Collection<Future<Collection<O>>> asyncExecute(final 
Iterator<ExecutionGroup<I>> executionGroups, final String processId, final 
ExecutorCallback<I, O> callback) {
         Collection<Future<Collection<O>>> result = new LinkedList<>();
         while (executionGroups.hasNext()) {
-            result.add(asyncExecute(executionGroups.next(), callback));
+            result.add(asyncExecute(executionGroups.next(), processId, 
callback));
         }
         return result;
     }
     
-    private <I, O> Future<Collection<O>> asyncExecute(final ExecutionGroup<I> 
executionGroup, final ExecutorCallback<I, O> callback) {
-        return executorServiceManager.getExecutorService().submit(() -> 
callback.execute(executionGroup.getInputs(), false));
+    private <I, O> Future<Collection<O>> asyncExecute(final ExecutionGroup<I> 
executionGroup, final String processId, final ExecutorCallback<I, O> callback) {
+        return executorServiceManager.getExecutorService().submit(() -> 
callback.execute(executionGroup.getInputs(), false, processId));
     }
     
     private <O> List<O> getGroupResults(final Collection<O> firstResults, 
final Collection<Future<Collection<O>>> restFutures) throws SQLException {
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutorCallback.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutorCallback.java
index 30b170138c3..7999cd5bb92 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutorCallback.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutorCallback.java
@@ -33,8 +33,9 @@ public interface ExecutorCallback<I, O> {
      * 
      * @param inputs input values
      * @param isTrunkThread is execution in trunk thread
+     * @param processId process ID
      * @return execution results
      * @throws SQLException throw when execute failure
      */
-    Collection<O> execute(Collection<I> inputs, boolean isTrunkThread) throws 
SQLException;
+    Collection<O> execute(Collection<I> inputs, boolean isTrunkThread, String 
processId) throws SQLException;
 }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index bbdcbf5b1a2..60fc2461919 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -55,11 +55,11 @@ public abstract class JDBCExecutorCallback<T> implements 
ExecutorCallback<JDBCEx
     private final ProcessEngine processEngine = new ProcessEngine();
     
     @Override
-    public final Collection<T> execute(final Collection<JDBCExecutionUnit> 
executionUnits, final boolean isTrunkThread) throws SQLException {
+    public final Collection<T> execute(final Collection<JDBCExecutionUnit> 
executionUnits, final boolean isTrunkThread, final String processId) throws 
SQLException {
         // TODO It is better to judge whether need sane result before execute, 
can avoid exception thrown
         Collection<T> result = new LinkedList<>();
         for (JDBCExecutionUnit each : executionUnits) {
-            T executeResult = execute(each, isTrunkThread);
+            T executeResult = execute(each, isTrunkThread, processId);
             if (null != executeResult) {
                 result.add(executeResult);
             }
@@ -72,7 +72,7 @@ public abstract class JDBCExecutorCallback<T> implements 
ExecutorCallback<JDBCEx
      *
      * @see <a 
href="https://github.com/apache/skywalking/blob/master/docs/en/guides/Java-Plugin-Development-Guide.md#user-content-plugin-development-guide";>Plugin
 Development Guide</a>
      */
-    private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean 
isTrunkThread) throws SQLException {
+    private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean 
isTrunkThread, final String processId) throws SQLException {
         SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
         DatabaseType storageType = 
resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getStorageType();
         ConnectionProperties connectionProps = 
resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getConnectionProperties();
@@ -82,7 +82,7 @@ public abstract class JDBCExecutorCallback<T> implements 
ExecutorCallback<JDBCEx
             
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(),
 sqlUnit.getSql(), sqlUnit.getParameters(), connectionProps, isTrunkThread);
             T result = executeSQL(sqlUnit.getSql(), 
jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), 
storageType);
             sqlExecutionHook.finishSuccess();
-            processEngine.completeSQLUnitExecution(jdbcExecutionUnit);
+            processEngine.completeSQLUnitExecution(jdbcExecutionUnit, 
processId);
             return result;
         } catch (final SQLException ex) {
             if (!storageType.equals(protocolType)) {
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index a5fd69c101c..720ee3202c6 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -61,7 +61,7 @@ public final class RawExecutor {
             List<ExecuteResult> results = execute(executionGroupContext, 
(RawSQLExecutorCallback) null, callback);
             return results.isEmpty() || null == results.get(0) ? 
Collections.singletonList(new UpdateResult(0, 0L)) : results;
         } finally {
-            processEngine.completeSQLExecution();
+            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
         }
     }
     
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 338962123f2..00d3f9f384d 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
 import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 
@@ -45,12 +44,10 @@ public final class RawSQLExecutorCallback implements 
ExecutorCallback<RawSQLExec
     
     @SuppressWarnings("unchecked")
     @Override
-    public Collection<ExecuteResult> execute(final 
Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread) throws 
SQLException {
+    public Collection<ExecuteResult> execute(final 
Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread, final 
String processId) throws SQLException {
         Collection<ExecuteResult> result = 
callbacks.iterator().next().execute(inputs, isTrunkThread);
-        if (!ProcessIdContext.isEmpty()) {
-            for (RawSQLExecutionUnit each : inputs) {
-                processEngine.completeSQLUnitExecution(each);
-            }
+        for (RawSQLExecutionUnit each : inputs) {
+            processEngine.completeSQLUnitExecution(each, processId);
         }
         return result;
     }
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 4a24993ae1b..5044a92fff1 100644
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++ 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.infra.executor.sql.process;
 
+import com.google.common.base.Strings;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
 import 
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
@@ -44,9 +45,7 @@ public final class ProcessEngine {
      * @return process ID
      */
     public String connect(final Grantee grantee, final String databaseName) {
-        // TODO remove processId return value, and use ProcessIdContext.get() 
instead
         String processId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
-        ProcessIdContext.set(processId);
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext =
                 new ExecutionGroupContext<>(Collections.emptyList(), new 
ExecutionGroupReportContext(processId, databaseName, grantee));
         Process process = new Process(executionGroupContext);
@@ -60,9 +59,7 @@ public final class ProcessEngine {
      * @param processId process ID
      */
     public void disconnect(final String processId) {
-        // TODO remove processId parameter, and use ProcessIdContext.get() 
instead
         ProcessRegistry.getInstance().remove(processId);
-        ProcessIdContext.remove();
     }
     
     /**
@@ -81,12 +78,13 @@ public final class ProcessEngine {
      * Complete SQL unit execution.
      * 
      * @param executionUnit execution unit
+     * @param processId process ID
      */
-    public void completeSQLUnitExecution(final SQLExecutionUnit executionUnit) 
{
-        if (ProcessIdContext.isEmpty()) {
+    public void completeSQLUnitExecution(final SQLExecutionUnit executionUnit, 
final String processId) {
+        if (Strings.isNullOrEmpty(processId)) {
             return;
         }
-        Process process = 
ProcessRegistry.getInstance().get(ProcessIdContext.get());
+        Process process = ProcessRegistry.getInstance().get(processId);
         if (null == process) {
             return;
         }
@@ -96,17 +94,19 @@ public final class ProcessEngine {
     
     /**
      * Complete SQL execution.
+     * 
+     * @param processId process ID
      */
-    public void completeSQLExecution() {
-        if (ProcessIdContext.isEmpty()) {
+    public void completeSQLExecution(final String processId) {
+        if (Strings.isNullOrEmpty(processId)) {
             return;
         }
-        Process process = 
ProcessRegistry.getInstance().get(ProcessIdContext.get());
+        Process process = ProcessRegistry.getInstance().get(processId);
         if (null == process) {
             return;
         }
         ExecutionGroupContext<? extends SQLExecutionUnit> 
executionGroupContext = new ExecutionGroupContext<>(
-                Collections.emptyList(), new 
ExecutionGroupReportContext(ProcessIdContext.get(), process.getDatabaseName(), 
new Grantee(process.getUsername(), process.getHostname())));
+                Collections.emptyList(), new 
ExecutionGroupReportContext(processId, process.getDatabaseName(), new 
Grantee(process.getUsername(), process.getHostname())));
         ProcessRegistry.getInstance().add(new Process(executionGroupContext));
     }
     
diff --git 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContext.java
 
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContext.java
deleted file mode 100644
index efe872f4f2e..00000000000
--- 
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContext.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process;
-
-import com.alibaba.ttl.TransmittableThreadLocal;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
-// TODO should remove the class, process ID should same with connection ID
-/**
- * Process ID context.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ProcessIdContext {
-    
-    private static final TransmittableThreadLocal<String> PROCESS_ID = new 
TransmittableThreadLocal<>();
-    
-    /**
-     * Judge whether process ID is empty or not.
-     *
-     * @return whether process ID is empty or not
-     */
-    public static boolean isEmpty() {
-        return null == PROCESS_ID.get();
-    }
-    
-    /**
-     * Get process ID.
-     *
-     * @return process ID
-     */
-    public static String get() {
-        return PROCESS_ID.get();
-    }
-    
-    /**
-     * Set process ID.
-     *
-     * @param processId process ID
-     */
-    public static void set(final String processId) {
-        PROCESS_ID.set(processId);
-    }
-    
-    /**
-     * Remove process ID.
-     */
-    public static void remove() {
-        PROCESS_ID.remove();
-    }
-}
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/fixture/ExecutorCallbackFixture.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/fixture/ExecutorCallbackFixture.java
index eac1493b680..a4487917bfc 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/fixture/ExecutorCallbackFixture.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/fixture/ExecutorCallbackFixture.java
@@ -31,7 +31,7 @@ public final class ExecutorCallbackFixture implements 
ExecutorCallback<Object, S
     private final CountDownLatch latch;
     
     @Override
-    public Collection<String> execute(final Collection<Object> inputs, final 
boolean isTrunkThread) {
+    public Collection<String> execute(final Collection<Object> inputs, final 
boolean isTrunkThread, final String processId) {
         List<String> result = new LinkedList<>();
         inputs.forEach(each -> {
             latch.countDown();
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
index 1369779743e..8520296fceb 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
@@ -40,6 +40,8 @@ import java.sql.Statement;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -81,8 +83,9 @@ class JDBCExecutorCallbackTest {
                         return Optional.of(saneResult);
                     }
                 };
-        assertThat(callback.execute(units, true), 
is(Collections.singletonList(saneResult)));
-        assertThat(callback.execute(units, false), 
is(Collections.emptyList()));
+        String processId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+        assertThat(callback.execute(units, true, processId), 
is(Collections.singletonList(saneResult)));
+        assertThat(callback.execute(units, false, processId), 
is(Collections.emptyList()));
     }
     
     @Test
@@ -102,6 +105,7 @@ class JDBCExecutorCallbackTest {
                         return Optional.empty();
                     }
                 };
-        assertThrows(SQLException.class, () -> callback.execute(units, true));
+        String processId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+        assertThrows(SQLException.class, () -> callback.execute(units, true, 
processId));
     }
 }
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
index f3eb13cc104..4cfbaa4bc35 100644
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
+++ 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
@@ -79,10 +79,8 @@ class ProcessEngineTest {
     
     @Test
     void assertCompleteSQLUnitExecution() {
-        ProcessIdContext.set("foo_id");
         when(processRegistry.get("foo_id")).thenReturn(mock(Process.class));
-        new 
ProcessEngine().completeSQLUnitExecution(mock(SQLExecutionUnit.class));
+        new 
ProcessEngine().completeSQLUnitExecution(mock(SQLExecutionUnit.class), 
"foo_id");
         verify(processRegistry).get("foo_id");
-        ProcessIdContext.remove();
     }
 }
diff --git 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContextTest.java
 
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContextTest.java
deleted file mode 100644
index 5a70bc485ba..00000000000
--- 
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContextTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-class ProcessIdContextTest {
-    
-    @AfterEach
-    void tearDown() {
-        ProcessIdContext.remove();
-    }
-    
-    @Test
-    void assertIsEmpty() {
-        assertTrue(ProcessIdContext.isEmpty());
-        ProcessIdContext.set("123e4567e89b12d3a456426655440000");
-        assertFalse(ProcessIdContext.isEmpty());
-    }
-    
-    @Test
-    void assertGet() {
-        assertNull(ProcessIdContext.get());
-        ProcessIdContext.set("123e4567e89b12d3a456426655440000");
-        assertThat(ProcessIdContext.get(), 
is("123e4567e89b12d3a456426655440000"));
-    }
-    
-    @Test
-    void assertSet() {
-        assertNull(ProcessIdContext.get());
-        ProcessIdContext.set("123e4567e89b12d3a456426655440000");
-        assertThat(ProcessIdContext.get(), 
is("123e4567e89b12d3a456426655440000"));
-        ProcessIdContext.set("123e4567e89b12d3a456426655440001");
-        assertThat(ProcessIdContext.get(), 
is("123e4567e89b12d3a456426655440001"));
-    }
-    
-    @Test
-    void assertRemove() {
-        assertNull(ProcessIdContext.get());
-        ProcessIdContext.set("123e4567e89b12d3a456426655440000");
-        assertThat(ProcessIdContext.get(), 
is("123e4567e89b12d3a456426655440000"));
-        ProcessIdContext.remove();
-        assertNull(ProcessIdContext.get());
-    }
-}
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 5ee5584e957..2c5308a90c5 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -75,7 +75,7 @@ public final class DriverJDBCExecutor {
             processEngine.executeSQL(executionGroupContext, queryContext);
             return jdbcExecutor.execute(executionGroupContext, callback);
         } finally {
-            processEngine.completeSQLExecution();
+            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
         }
     }
     
@@ -99,7 +99,7 @@ public final class DriverJDBCExecutor {
                     ? accumulate(results)
                     : results.get(0);
         } finally {
-            processEngine.completeSQLExecution();
+            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
         }
     }
     
@@ -137,7 +137,7 @@ public final class DriverJDBCExecutor {
             List<Boolean> results = doExecute(executionGroupContext, 
queryContext.getSqlStatementContext(), routeUnits, callback);
             return null != results && !results.isEmpty() && null != 
results.get(0) && results.get(0);
         } finally {
-            processEngine.completeSQLExecution();
+            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
         }
     }
     
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
index 9173d58b68c..94c18eccbef 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
@@ -29,7 +29,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorEx
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
 import 
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
 import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
 import 
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
@@ -65,11 +64,11 @@ public final class BatchPreparedStatementExecutor {
     
     private final String databaseName;
     
-    public BatchPreparedStatementExecutor(final MetaDataContexts 
metaDataContexts, final JDBCExecutor jdbcExecutor, final String databaseName) {
+    public BatchPreparedStatementExecutor(final MetaDataContexts 
metaDataContexts, final JDBCExecutor jdbcExecutor, final String databaseName, 
final String processId) {
         this.databaseName = databaseName;
         this.metaDataContexts = metaDataContexts;
         this.jdbcExecutor = jdbcExecutor;
-        executionGroupContext = new ExecutionGroupContext<>(new 
LinkedList<>(), new ExecutionGroupReportContext(ProcessIdContext.get(), 
databaseName, new Grantee("", "")));
+        executionGroupContext = new ExecutionGroupContext<>(new 
LinkedList<>(), new ExecutionGroupReportContext(processId, databaseName, new 
Grantee("", "")));
         batchExecutionUnits = new LinkedList<>();
     }
     
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 1614bb0c08e..0d1b4a37456 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOper
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
@@ -49,8 +48,6 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
     @Getter(AccessLevel.NONE)
     private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new 
ForceExecuteTemplate<>();
     
-    private final ProcessEngine processEngine = new ProcessEngine();
-    
     private boolean poolable;
     
     private int fetchSize;
@@ -93,8 +90,6 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
     
     protected abstract StatementManager getStatementManager();
     
-    protected abstract String getProcessId();
-    
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     public final void setPoolable(final boolean poolable) throws SQLException {
@@ -234,7 +229,6 @@ public abstract class AbstractStatementAdapter extends 
AbstractUnsupportedOperat
             }
         } finally {
             getRoutedStatements().clear();
-            processEngine.disconnect(getProcessId());
         }
     }
 }
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 68989e91bfa..532a3743029 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -24,6 +24,8 @@ import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePrepar
 import 
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
 import 
org.apache.shardingsphere.driver.jdbc.exception.connection.ConnectionClosedException;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.transaction.api.TransactionType;
@@ -42,6 +44,8 @@ import java.sql.Statement;
  */
 public final class ShardingSphereConnection extends AbstractConnectionAdapter {
     
+    private final ProcessEngine processEngine = new ProcessEngine();
+    
     @Getter
     private final String databaseName;
     
@@ -51,6 +55,9 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
     @Getter
     private final DriverDatabaseConnectionManager databaseConnectionManager;
     
+    @Getter
+    private final String processId;
+    
     private boolean autoCommit = true;
     
     private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
@@ -63,6 +70,7 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
         this.databaseName = databaseName;
         this.contextManager = contextManager;
         databaseConnectionManager = new 
DriverDatabaseConnectionManager(databaseName, contextManager);
+        processId = processEngine.connect(new Grantee("", ""), databaseName);
     }
     
     /**
@@ -301,6 +309,7 @@ public final class ShardingSphereConnection extends 
AbstractConnectionAdapter {
     public void close() throws SQLException {
         closed = true;
         databaseConnectionManager.close();
+        processEngine.disconnect(processId);
     }
     
     private ConnectionContext getConnectionContext() {
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 5565a09b206..6f289b7aced 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -149,9 +149,6 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     @Getter(AccessLevel.PROTECTED)
     private final StatementManager statementManager;
     
-    @Getter
-    private final String processId;
-    
     @Getter
     private final boolean selectContainsEnhancedTable;
     
@@ -212,13 +209,12 @@ public final class ShardingSpherePreparedStatement 
extends AbstractPreparedState
         statementOption = returnGeneratedKeys ? new StatementOption(true, 
columns) : new StatementOption(resultSetType, resultSetConcurrency, 
resultSetHoldability);
         executor = new DriverExecutor(connection);
         JDBCExecutor jdbcExecutor = new 
JDBCExecutor(connection.getContextManager().getExecutorEngine(), 
connection.getDatabaseConnectionManager().getConnectionContext());
-        batchPreparedStatementExecutor = new 
BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, databaseName);
+        batchPreparedStatementExecutor = new 
BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, databaseName, 
connection.getProcessId());
         kernelProcessor = new KernelProcessor();
         statementsCacheable = 
isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData());
         trafficRule = 
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
         selectContainsEnhancedTable = sqlStatementContext instanceof 
SelectStatementContext && ((SelectStatementContext) 
sqlStatementContext).isContainsEnhancedTable();
         statementManager = new StatementManager();
-        processId = getProcessEngine().connect(new Grantee("", ""), 
databaseName);
     }
     
     private boolean isStatementsCacheable(final RuleMetaData 
databaseRuleMetaData) {
@@ -284,7 +280,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new 
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
         ExecutionGroupContext<JDBCExecutionUnit> context =
-                prepareEngine.prepare(new RouteContext(), 
Collections.singleton(executionUnit), new 
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
+                prepareEngine.prepare(new RouteContext(), 
Collections.singleton(executionUnit), new 
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new 
Grantee("", "")));
         if (context.getInputGroups().isEmpty() || 
context.getInputGroups().iterator().next().getInputs().isEmpty()) {
             throw new EmptyTrafficExecutionUnitException();
         }
@@ -331,7 +327,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private ResultSet executeFederationQuery(final QueryContext queryContext) {
         PreparedStatementExecuteQueryCallback callback = new 
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
                 
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), 
sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
-        SQLFederationExecutorContext context = new 
SQLFederationExecutorContext(false, queryContext, 
metaDataContexts.getMetaData());
+        SQLFederationExecutorContext context = new 
SQLFederationExecutorContext(false, queryContext, 
metaDataContexts.getMetaData(), connection.getProcessId());
         return 
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(),
 callback, context);
     }
     
@@ -456,7 +452,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
-                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
+                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new 
Grantee("", "")));
     }
     
     private boolean executeWithExecutionContext(final ExecutionContext 
executionContext) throws SQLException {
@@ -532,7 +528,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
     private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
-                new ExecutionGroupReportContext(processId, databaseName, new 
Grantee("", "")));
+                new ExecutionGroupReportContext(connection.getProcessId(), 
databaseName, new Grantee("", "")));
     }
     
     @Override
@@ -724,7 +720,7 @@ public final class ShardingSpherePreparedStatement extends 
AbstractPreparedState
             ExecutionUnit executionUnit = each.getExecutionUnit();
             executionUnits.add(executionUnit);
         }
-        
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(), 
executionUnits, new ExecutionGroupReportContext(processId, databaseName, new 
Grantee("", ""))));
+        
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(), 
executionUnits, new ExecutionGroupReportContext(connection.getProcessId(), 
databaseName, new Grantee("", ""))));
         setBatchParametersForStatements(batchExecutor);
     }
     
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 7208285bb55..83de9d7a266 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -121,9 +121,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     @Getter(AccessLevel.PROTECTED)
     private final StatementManager statementManager;
     
-    @Getter
-    private final String processId;
-    
     private final BatchStatementExecutor batchStatementExecutor;
     
     private boolean returnGeneratedKeys;
@@ -157,7 +154,6 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         statementManager = new StatementManager();
         batchStatementExecutor = new BatchStatementExecutor(this);
         databaseName = connection.getDatabaseName();
-        processId = getProcessEngine().connect(new Grantee("", ""), 
databaseName);
     }
     
     @Override
@@ -242,7 +238,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         StatementExecuteQueryCallback callback = new 
StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
                 
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(), 
queryContext.getSqlStatementContext().getSqlStatement(),
                 SQLExecutorExceptionHandler.isExceptionThrown());
-        SQLFederationExecutorContext context = new 
SQLFederationExecutorContext(false, queryContext, 
metaDataContexts.getMetaData());
+        SQLFederationExecutorContext context = new 
SQLFederationExecutorContext(false, queryContext, 
metaDataContexts.getMetaData(), connection.getProcessId());
         return 
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(),
 callback, context);
     }
     
@@ -481,7 +477,7 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new 
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
         ExecutionGroupContext<JDBCExecutionUnit> context =
-                prepareEngine.prepare(new RouteContext(), 
Collections.singletonList(executionUnit), new 
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
+                prepareEngine.prepare(new RouteContext(), 
Collections.singletonList(executionUnit), new 
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new 
Grantee("", "")));
         return context.getInputGroups().stream().flatMap(each -> 
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
     }
     
@@ -528,13 +524,13 @@ public final class ShardingSphereStatement extends 
AbstractStatementAdapter {
     private ExecutionGroupContext<JDBCExecutionUnit> 
createExecutionGroupContext(final ExecutionContext executionContext) throws 
SQLException {
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine();
         return prepareEngine.prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(),
-                new ExecutionGroupReportContext(processId, databaseName, new 
Grantee("", "")));
+                new ExecutionGroupReportContext(connection.getProcessId(), 
databaseName, new Grantee("", "")));
     }
     
     private ExecutionGroupContext<RawSQLExecutionUnit> 
createRawExecutionContext(final ExecutionContext executionContext) throws 
SQLException {
         int maxConnectionsSizePerQuery = 
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
         return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery, 
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
-                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
+                .prepare(executionContext.getRouteContext(), 
executionContext.getExecutionUnits(), new 
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new 
Grantee("", "")));
     }
     
     private boolean executeWithExecutionContext(final ExecuteCallback 
executeCallback, final ExecutionContext executionContext) throws SQLException {
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index e179f53f3b2..ac096d5e97a 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -284,11 +284,6 @@ public final class CircuitBreakerPreparedStatement extends 
AbstractUnsupportedOp
         return null;
     }
     
-    @Override
-    protected String getProcessId() {
-        return null;
-    }
-    
     @Override
     public ResultSet executeQuery() {
         return new CircuitBreakerResultSet();
diff --git 
a/jdbc/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
 
b/jdbc/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
index 6c8cb0acb12..23a6fc48c01 100644
--- 
a/jdbc/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
+++ 
b/jdbc/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
@@ -88,8 +88,9 @@ class BatchPreparedStatementExecutorTest {
     void setUp() {
         SQLExecutorExceptionHandler.setExceptionThrown(true);
         ShardingSphereConnection connection = new 
ShardingSphereConnection("foo_db", mockContextManager());
-        executor = new BatchPreparedStatementExecutor(
-                connection.getContextManager().getMetaDataContexts(), new 
JDBCExecutor(executorEngine, 
connection.getDatabaseConnectionManager().getConnectionContext()), "foo_db");
+        String processId = new UUID(ThreadLocalRandom.current().nextLong(), 
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+        executor = new 
BatchPreparedStatementExecutor(connection.getContextManager().getMetaDataContexts(),
+                new JDBCExecutor(executorEngine, 
connection.getDatabaseConnectionManager().getConnectionContext()), "foo_db", 
processId);
         
when(sqlStatementContext.getTablesContext()).thenReturn(mock(TablesContext.class));
     }
     
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
index cb3557269d8..247cccfa59e 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
@@ -40,4 +40,6 @@ public final class SQLFederationExecutorContext {
     private final QueryContext queryContext;
     
     private final ShardingSphereMetaData metaData;
+    
+    private final String processId;
 }
diff --git 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
index 6bb47cac627..01fd2362172 100644
--- 
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
+++ 
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
@@ -41,7 +41,6 @@ import 
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult
 import 
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
 import 
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
 import org.apache.shardingsphere.infra.hint.HintValueContext;
 import org.apache.shardingsphere.infra.merge.MergeEngine;
 import org.apache.shardingsphere.infra.merge.result.MergedResult;
@@ -142,7 +141,7 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
         try {
             return createEnumerable(queryContext, database, context);
         } finally {
-            processEngine.completeSQLExecution();
+            
processEngine.completeSQLExecution(federationContext.getProcessId());
         }
     }
     
@@ -155,7 +154,7 @@ public final class EnumerableScanExecutor implements 
ScanExecutor {
                 computeConnectionOffsets(context);
                 // TODO pass grantee from proxy and jdbc adapter
                 ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext 
= prepareEngine.prepare(context.getRouteContext(), 
executorContext.getConnectionOffsets(), context.getExecutionUnits(),
-                        new 
ExecutionGroupReportContext(ProcessIdContext.get(), database.getName(), new 
Grantee("", "")));
+                        new 
ExecutionGroupReportContext(executorContext.getFederationContext().getProcessId(),
 database.getName(), new Grantee("", "")));
                 setParameters(executionGroupContext.getInputGroups());
                 processEngine.executeSQL(executionGroupContext, 
context.getQueryContext());
                 List<QueryResult> queryResults = 
jdbcExecutor.execute(executionGroupContext, 
callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
index 7bfa9e62012..ab30083ad4b 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
@@ -231,7 +231,7 @@ public final class DatabaseConnector implements 
DatabaseBackendHandler {
         ProxyJDBCExecutorCallback callback = 
ProxyJDBCExecutorCallbackFactory.newInstance(driverType, protocolType, 
database.getResourceMetaData(),
                 queryContext.getSqlStatementContext().getSqlStatement(), this, 
isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, 
metaDataContexts);
-        SQLFederationExecutorContext context = new 
SQLFederationExecutorContext(false, queryContext, 
metaDataContexts.getMetaData());
+        SQLFederationExecutorContext context = new 
SQLFederationExecutorContext(false, queryContext, 
metaDataContexts.getMetaData(), 
databaseConnectionManager.getConnectionSession().getProcessId());
         return 
proxySQLExecutor.getSqlFederationEngine().executeQuery(prepareEngine, callback, 
context);
     }
     
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
index 27ab4572437..e8800671f90 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
@@ -78,7 +78,7 @@ public final class ProxyJDBCExecutor {
                             isExceptionThrown,
                             false));
         } finally {
-            processEngine.completeSQLExecution();
+            
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
         }
     }
 }
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
index 8f450a881ea..d33f7c8fc6c 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
@@ -23,6 +23,7 @@ import 
org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorConnection
 import 
org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
 import 
org.apache.shardingsphere.distsql.handler.engine.DistSQLConnectionContext;
 import 
org.apache.shardingsphere.distsql.handler.engine.query.DistSQLQueryExecutor;
+import 
org.apache.shardingsphere.distsql.handler.exception.rule.RuleNotExistedException;
 import org.apache.shardingsphere.distsql.statement.rul.sql.PreviewStatement;
 import 
org.apache.shardingsphere.infra.binder.context.aware.CursorDefinitionAware;
 import 
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -54,8 +55,8 @@ import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.session.query.QueryContext;
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import 
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
 import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
-import 
org.apache.shardingsphere.distsql.handler.exception.rule.RuleNotExistedException;
 import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
 import 
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
 import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
@@ -129,7 +130,8 @@ public final class PreviewExecutor implements 
DistSQLQueryExecutor<PreviewStatem
         // TODO move dialect MySQLInsertStatement into database type module 
@zhangliang
         boolean isReturnGeneratedKeys = sqlStatement instanceof 
MySQLInsertStatement;
         DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys, 
metaData.getProps());
-        SQLFederationExecutorContext context = new 
SQLFederationExecutorContext(true, queryContext, metaData);
+        SQLFederationExecutorContext context = new 
SQLFederationExecutorContext(true, queryContext, metaData,
+                ((ProxyDatabaseConnectionManager) 
connectionContext.getDatabaseConnectionManager()).getConnectionSession().getProcessId());
         federationEngine.executeQuery(prepareEngine, 
createPreviewCallback(sqlStatement), context);
         return context.getPreviewExecutionUnits();
     }
diff --git 
a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
 
b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
index fc074fb2788..b168a20ca09 100644
--- 
a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
+++ 
b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
@@ -85,7 +85,8 @@ public final class OpenGaussSystemCatalogAdminQueryExecutor 
implements DatabaseA
         JDBCExecutor jdbcExecutor = new 
JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(), 
connectionSession.getConnectionContext());
         try (SQLFederationEngine sqlFederationEngine = new 
SQLFederationEngine(databaseName, PG_CATALOG, metaDataContexts.getMetaData(), 
metaDataContexts.getStatistics(), jdbcExecutor)) {
             DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> 
prepareEngine = createDriverExecutionPrepareEngine(metaDataContexts, 
connectionSession);
-            SQLFederationExecutorContext context = new 
SQLFederationExecutorContext(false, new QueryContext(sqlStatementContext, sql, 
parameters), metaDataContexts.getMetaData());
+            SQLFederationExecutorContext context =
+                    new SQLFederationExecutorContext(false, new 
QueryContext(sqlStatementContext, sql, parameters), 
metaDataContexts.getMetaData(), connectionSession.getProcessId());
             ShardingSphereDatabase database = 
metaDataContexts.getMetaData().getDatabase(databaseName);
             ResultSet resultSet = 
sqlFederationEngine.executeQuery(prepareEngine,
                     
createOpenGaussSystemCatalogAdminQueryCallback(database.getProtocolType(), 
database.getResourceMetaData(), sqlStatementContext.getSqlStatement()), 
context);

Reply via email to