This is an automated email from the ASF dual-hosted git repository.
chengzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a9f2b0ae7a2 Revert PR#30201 and remove ProcessIdContext and pass
processId when execute sql (#30204)
a9f2b0ae7a2 is described below
commit a9f2b0ae7a243bfbcb114d427b4dd4206abc6196
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Tue Feb 20 13:00:09 2024 +0800
Revert PR#30201 and remove ProcessIdContext and pass processId when execute
sql (#30204)
* Revert "Adjust jdbc processId to statement level to avoid monitor block
when run benchmarksql (#30201)"
This reverts commit 1ccc9ac6551741aeb0e806311ca48f9f5e737a67.
* Remove ProcessIdContext and pass processId when execute sql
---
.../infra/executor/kernel/ExecutorEngine.java | 32 ++++++-----
.../executor/kernel/model/ExecutorCallback.java | 3 +-
.../engine/driver/jdbc/JDBCExecutorCallback.java | 8 +--
.../sql/execute/engine/raw/RawExecutor.java | 2 +-
.../raw/callback/RawSQLExecutorCallback.java | 9 +--
.../infra/executor/sql/process/ProcessEngine.java | 22 +++----
.../executor/sql/process/ProcessIdContext.java | 66 ---------------------
.../kernel/fixture/ExecutorCallbackFixture.java | 2 +-
.../engine/jdbc/JDBCExecutorCallbackTest.java | 10 +++-
.../executor/sql/process/ProcessEngineTest.java | 4 +-
.../executor/sql/process/ProcessIdContextTest.java | 67 ----------------------
.../driver/executor/DriverJDBCExecutor.java | 6 +-
.../batch/BatchPreparedStatementExecutor.java | 5 +-
.../jdbc/adapter/AbstractStatementAdapter.java | 6 --
.../core/connection/ShardingSphereConnection.java | 9 +++
.../statement/ShardingSpherePreparedStatement.java | 16 ++----
.../core/statement/ShardingSphereStatement.java | 12 ++--
.../statement/CircuitBreakerPreparedStatement.java | 5 --
.../batch/BatchPreparedStatementExecutorTest.java | 5 +-
.../executor/SQLFederationExecutorContext.java | 2 +
.../enumerable/EnumerableScanExecutor.java | 5 +-
.../proxy/backend/connector/DatabaseConnector.java | 2 +-
.../connector/jdbc/executor/ProxyJDBCExecutor.java | 2 +-
.../handler/distsql/rul/PreviewExecutor.java | 6 +-
.../OpenGaussSystemCatalogAdminQueryExecutor.java | 3 +-
25 files changed, 86 insertions(+), 223 deletions(-)
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
index 2e5f2c7e849..3703e95d1ec 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/ExecutorEngine.java
@@ -18,11 +18,11 @@
package org.apache.shardingsphere.infra.executor.kernel;
import lombok.Getter;
+import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnknownSQLException;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorServiceManager;
-import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnknownSQLException;
import java.sql.SQLException;
import java.util.Collection;
@@ -72,39 +72,41 @@ public final class ExecutorEngine implements AutoCloseable {
if (executionGroupContext.getInputGroups().isEmpty()) {
return Collections.emptyList();
}
- return serial ?
serialExecute(executionGroupContext.getInputGroups().iterator(), firstCallback,
callback)
- :
parallelExecute(executionGroupContext.getInputGroups().iterator(),
firstCallback, callback);
+ return serial ?
serialExecute(executionGroupContext.getInputGroups().iterator(),
executionGroupContext.getReportContext().getProcessId(), firstCallback,
callback)
+ :
parallelExecute(executionGroupContext.getInputGroups().iterator(),
executionGroupContext.getReportContext().getProcessId(), firstCallback,
callback);
}
- private <I, O> List<O> serialExecute(final Iterator<ExecutionGroup<I>>
executionGroups, final ExecutorCallback<I, O> firstCallback, final
ExecutorCallback<I, O> callback) throws SQLException {
+ private <I, O> List<O> serialExecute(final Iterator<ExecutionGroup<I>>
executionGroups, final String processId, final ExecutorCallback<I, O>
firstCallback,
+ final ExecutorCallback<I, O>
callback) throws SQLException {
ExecutionGroup<I> firstInputs = executionGroups.next();
- List<O> result = new LinkedList<>(syncExecute(firstInputs, null ==
firstCallback ? callback : firstCallback));
+ List<O> result = new LinkedList<>(syncExecute(firstInputs, processId,
null == firstCallback ? callback : firstCallback));
while (executionGroups.hasNext()) {
- result.addAll(syncExecute(executionGroups.next(), callback));
+ result.addAll(syncExecute(executionGroups.next(), processId,
callback));
}
return result;
}
- private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>>
executionGroups, final ExecutorCallback<I, O> firstCallback, final
ExecutorCallback<I, O> callback) throws SQLException {
+ private <I, O> List<O> parallelExecute(final Iterator<ExecutionGroup<I>>
executionGroups, final String processId, final ExecutorCallback<I, O>
firstCallback,
+ final ExecutorCallback<I, O>
callback) throws SQLException {
ExecutionGroup<I> firstInputs = executionGroups.next();
- Collection<Future<Collection<O>>> restResultFutures =
asyncExecute(executionGroups, callback);
- return getGroupResults(syncExecute(firstInputs, null == firstCallback
? callback : firstCallback), restResultFutures);
+ Collection<Future<Collection<O>>> restResultFutures =
asyncExecute(executionGroups, processId, callback);
+ return getGroupResults(syncExecute(firstInputs, processId, null ==
firstCallback ? callback : firstCallback), restResultFutures);
}
- private <I, O> Collection<O> syncExecute(final ExecutionGroup<I>
executionGroup, final ExecutorCallback<I, O> callback) throws SQLException {
- return callback.execute(executionGroup.getInputs(), true);
+ private <I, O> Collection<O> syncExecute(final ExecutionGroup<I>
executionGroup, final String processId, final ExecutorCallback<I, O> callback)
throws SQLException {
+ return callback.execute(executionGroup.getInputs(), true, processId);
}
- private <I, O> Collection<Future<Collection<O>>> asyncExecute(final
Iterator<ExecutionGroup<I>> executionGroups, final ExecutorCallback<I, O>
callback) {
+ private <I, O> Collection<Future<Collection<O>>> asyncExecute(final
Iterator<ExecutionGroup<I>> executionGroups, final String processId, final
ExecutorCallback<I, O> callback) {
Collection<Future<Collection<O>>> result = new LinkedList<>();
while (executionGroups.hasNext()) {
- result.add(asyncExecute(executionGroups.next(), callback));
+ result.add(asyncExecute(executionGroups.next(), processId,
callback));
}
return result;
}
- private <I, O> Future<Collection<O>> asyncExecute(final ExecutionGroup<I>
executionGroup, final ExecutorCallback<I, O> callback) {
- return executorServiceManager.getExecutorService().submit(() ->
callback.execute(executionGroup.getInputs(), false));
+ private <I, O> Future<Collection<O>> asyncExecute(final ExecutionGroup<I>
executionGroup, final String processId, final ExecutorCallback<I, O> callback) {
+ return executorServiceManager.getExecutorService().submit(() ->
callback.execute(executionGroup.getInputs(), false, processId));
}
private <O> List<O> getGroupResults(final Collection<O> firstResults,
final Collection<Future<Collection<O>>> restFutures) throws SQLException {
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutorCallback.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutorCallback.java
index 30b170138c3..7999cd5bb92 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutorCallback.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/kernel/model/ExecutorCallback.java
@@ -33,8 +33,9 @@ public interface ExecutorCallback<I, O> {
*
* @param inputs input values
* @param isTrunkThread is execution in trunk thread
+ * @param processId process ID
* @return execution results
* @throws SQLException throw when execute failure
*/
- Collection<O> execute(Collection<I> inputs, boolean isTrunkThread) throws
SQLException;
+ Collection<O> execute(Collection<I> inputs, boolean isTrunkThread, String
processId) throws SQLException;
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
index bbdcbf5b1a2..60fc2461919 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/driver/jdbc/JDBCExecutorCallback.java
@@ -55,11 +55,11 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
private final ProcessEngine processEngine = new ProcessEngine();
@Override
- public final Collection<T> execute(final Collection<JDBCExecutionUnit>
executionUnits, final boolean isTrunkThread) throws SQLException {
+ public final Collection<T> execute(final Collection<JDBCExecutionUnit>
executionUnits, final boolean isTrunkThread, final String processId) throws
SQLException {
// TODO It is better to judge whether need sane result before execute,
can avoid exception thrown
Collection<T> result = new LinkedList<>();
for (JDBCExecutionUnit each : executionUnits) {
- T executeResult = execute(each, isTrunkThread);
+ T executeResult = execute(each, isTrunkThread, processId);
if (null != executeResult) {
result.add(executeResult);
}
@@ -72,7 +72,7 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
*
* @see <a
href="https://github.com/apache/skywalking/blob/master/docs/en/guides/Java-Plugin-Development-Guide.md#user-content-plugin-development-guide">Plugin
Development Guide</a>
*/
- private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean
isTrunkThread) throws SQLException {
+ private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean
isTrunkThread, final String processId) throws SQLException {
SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
DatabaseType storageType =
resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getStorageType();
ConnectionProperties connectionProps =
resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getConnectionProperties();
@@ -82,7 +82,7 @@ public abstract class JDBCExecutorCallback<T> implements
ExecutorCallback<JDBCEx
sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(),
sqlUnit.getSql(), sqlUnit.getParameters(), connectionProps, isTrunkThread);
T result = executeSQL(sqlUnit.getSql(),
jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(),
storageType);
sqlExecutionHook.finishSuccess();
- processEngine.completeSQLUnitExecution(jdbcExecutionUnit);
+ processEngine.completeSQLUnitExecution(jdbcExecutionUnit,
processId);
return result;
} catch (final SQLException ex) {
if (!storageType.equals(protocolType)) {
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
index a5fd69c101c..720ee3202c6 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/RawExecutor.java
@@ -61,7 +61,7 @@ public final class RawExecutor {
List<ExecuteResult> results = execute(executionGroupContext,
(RawSQLExecutorCallback) null, callback);
return results.isEmpty() || null == results.get(0) ?
Collections.singletonList(new UpdateResult(0, 0L)) : results;
} finally {
- processEngine.completeSQLExecution();
+
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
}
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
index 338962123f2..00d3f9f384d 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/raw/callback/RawSQLExecutorCallback.java
@@ -21,7 +21,6 @@ import com.google.common.base.Preconditions;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
@@ -45,12 +44,10 @@ public final class RawSQLExecutorCallback implements
ExecutorCallback<RawSQLExec
@SuppressWarnings("unchecked")
@Override
- public Collection<ExecuteResult> execute(final
Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread) throws
SQLException {
+ public Collection<ExecuteResult> execute(final
Collection<RawSQLExecutionUnit> inputs, final boolean isTrunkThread, final
String processId) throws SQLException {
Collection<ExecuteResult> result =
callbacks.iterator().next().execute(inputs, isTrunkThread);
- if (!ProcessIdContext.isEmpty()) {
- for (RawSQLExecutionUnit each : inputs) {
- processEngine.completeSQLUnitExecution(each);
- }
+ for (RawSQLExecutionUnit each : inputs) {
+ processEngine.completeSQLUnitExecution(each, processId);
}
return result;
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
index 4a24993ae1b..5044a92fff1 100644
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
+++
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngine.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.infra.executor.sql.process;
+import com.google.common.base.Strings;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
import
org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupReportContext;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
@@ -44,9 +45,7 @@ public final class ProcessEngine {
* @return process ID
*/
public String connect(final Grantee grantee, final String databaseName) {
- // TODO remove processId return value, and use ProcessIdContext.get()
instead
String processId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
- ProcessIdContext.set(processId);
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext =
new ExecutionGroupContext<>(Collections.emptyList(), new
ExecutionGroupReportContext(processId, databaseName, grantee));
Process process = new Process(executionGroupContext);
@@ -60,9 +59,7 @@ public final class ProcessEngine {
* @param processId process ID
*/
public void disconnect(final String processId) {
- // TODO remove processId parameter, and use ProcessIdContext.get()
instead
ProcessRegistry.getInstance().remove(processId);
- ProcessIdContext.remove();
}
/**
@@ -81,12 +78,13 @@ public final class ProcessEngine {
* Complete SQL unit execution.
*
* @param executionUnit execution unit
+ * @param processId process ID
*/
- public void completeSQLUnitExecution(final SQLExecutionUnit executionUnit)
{
- if (ProcessIdContext.isEmpty()) {
+ public void completeSQLUnitExecution(final SQLExecutionUnit executionUnit,
final String processId) {
+ if (Strings.isNullOrEmpty(processId)) {
return;
}
- Process process =
ProcessRegistry.getInstance().get(ProcessIdContext.get());
+ Process process = ProcessRegistry.getInstance().get(processId);
if (null == process) {
return;
}
@@ -96,17 +94,19 @@ public final class ProcessEngine {
/**
* Complete SQL execution.
+ *
+ * @param processId process ID
*/
- public void completeSQLExecution() {
- if (ProcessIdContext.isEmpty()) {
+ public void completeSQLExecution(final String processId) {
+ if (Strings.isNullOrEmpty(processId)) {
return;
}
- Process process =
ProcessRegistry.getInstance().get(ProcessIdContext.get());
+ Process process = ProcessRegistry.getInstance().get(processId);
if (null == process) {
return;
}
ExecutionGroupContext<? extends SQLExecutionUnit>
executionGroupContext = new ExecutionGroupContext<>(
- Collections.emptyList(), new
ExecutionGroupReportContext(ProcessIdContext.get(), process.getDatabaseName(),
new Grantee(process.getUsername(), process.getHostname())));
+ Collections.emptyList(), new
ExecutionGroupReportContext(processId, process.getDatabaseName(), new
Grantee(process.getUsername(), process.getHostname())));
ProcessRegistry.getInstance().add(new Process(executionGroupContext));
}
diff --git
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContext.java
b/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContext.java
deleted file mode 100644
index efe872f4f2e..00000000000
---
a/infra/executor/src/main/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContext.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process;
-
-import com.alibaba.ttl.TransmittableThreadLocal;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
-// TODO should remove the class, process ID should same with connection ID
-/**
- * Process ID context.
- */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class ProcessIdContext {
-
- private static final TransmittableThreadLocal<String> PROCESS_ID = new
TransmittableThreadLocal<>();
-
- /**
- * Judge whether process ID is empty or not.
- *
- * @return whether process ID is empty or not
- */
- public static boolean isEmpty() {
- return null == PROCESS_ID.get();
- }
-
- /**
- * Get process ID.
- *
- * @return process ID
- */
- public static String get() {
- return PROCESS_ID.get();
- }
-
- /**
- * Set process ID.
- *
- * @param processId process ID
- */
- public static void set(final String processId) {
- PROCESS_ID.set(processId);
- }
-
- /**
- * Remove process ID.
- */
- public static void remove() {
- PROCESS_ID.remove();
- }
-}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/fixture/ExecutorCallbackFixture.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/fixture/ExecutorCallbackFixture.java
index eac1493b680..a4487917bfc 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/fixture/ExecutorCallbackFixture.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/kernel/fixture/ExecutorCallbackFixture.java
@@ -31,7 +31,7 @@ public final class ExecutorCallbackFixture implements
ExecutorCallback<Object, S
private final CountDownLatch latch;
@Override
- public Collection<String> execute(final Collection<Object> inputs, final
boolean isTrunkThread) {
+ public Collection<String> execute(final Collection<Object> inputs, final
boolean isTrunkThread, final String processId) {
List<String> result = new LinkedList<>();
inputs.forEach(each -> {
latch.countDown();
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
index 1369779743e..8520296fceb 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/execute/engine/jdbc/JDBCExecutorCallbackTest.java
@@ -40,6 +40,8 @@ import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -81,8 +83,9 @@ class JDBCExecutorCallbackTest {
return Optional.of(saneResult);
}
};
- assertThat(callback.execute(units, true),
is(Collections.singletonList(saneResult)));
- assertThat(callback.execute(units, false),
is(Collections.emptyList()));
+ String processId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+ assertThat(callback.execute(units, true, processId),
is(Collections.singletonList(saneResult)));
+ assertThat(callback.execute(units, false, processId),
is(Collections.emptyList()));
}
@Test
@@ -102,6 +105,7 @@ class JDBCExecutorCallbackTest {
return Optional.empty();
}
};
- assertThrows(SQLException.class, () -> callback.execute(units, true));
+ String processId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+ assertThrows(SQLException.class, () -> callback.execute(units, true,
processId));
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
index f3eb13cc104..4cfbaa4bc35 100644
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
+++
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessEngineTest.java
@@ -79,10 +79,8 @@ class ProcessEngineTest {
@Test
void assertCompleteSQLUnitExecution() {
- ProcessIdContext.set("foo_id");
when(processRegistry.get("foo_id")).thenReturn(mock(Process.class));
- new
ProcessEngine().completeSQLUnitExecution(mock(SQLExecutionUnit.class));
+ new
ProcessEngine().completeSQLUnitExecution(mock(SQLExecutionUnit.class),
"foo_id");
verify(processRegistry).get("foo_id");
- ProcessIdContext.remove();
}
}
diff --git
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContextTest.java
b/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContextTest.java
deleted file mode 100644
index 5a70bc485ba..00000000000
---
a/infra/executor/src/test/java/org/apache/shardingsphere/infra/executor/sql/process/ProcessIdContextTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.executor.sql.process;
-
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-class ProcessIdContextTest {
-
- @AfterEach
- void tearDown() {
- ProcessIdContext.remove();
- }
-
- @Test
- void assertIsEmpty() {
- assertTrue(ProcessIdContext.isEmpty());
- ProcessIdContext.set("123e4567e89b12d3a456426655440000");
- assertFalse(ProcessIdContext.isEmpty());
- }
-
- @Test
- void assertGet() {
- assertNull(ProcessIdContext.get());
- ProcessIdContext.set("123e4567e89b12d3a456426655440000");
- assertThat(ProcessIdContext.get(),
is("123e4567e89b12d3a456426655440000"));
- }
-
- @Test
- void assertSet() {
- assertNull(ProcessIdContext.get());
- ProcessIdContext.set("123e4567e89b12d3a456426655440000");
- assertThat(ProcessIdContext.get(),
is("123e4567e89b12d3a456426655440000"));
- ProcessIdContext.set("123e4567e89b12d3a456426655440001");
- assertThat(ProcessIdContext.get(),
is("123e4567e89b12d3a456426655440001"));
- }
-
- @Test
- void assertRemove() {
- assertNull(ProcessIdContext.get());
- ProcessIdContext.set("123e4567e89b12d3a456426655440000");
- assertThat(ProcessIdContext.get(),
is("123e4567e89b12d3a456426655440000"));
- ProcessIdContext.remove();
- assertNull(ProcessIdContext.get());
- }
-}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
index 5ee5584e957..2c5308a90c5 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/DriverJDBCExecutor.java
@@ -75,7 +75,7 @@ public final class DriverJDBCExecutor {
processEngine.executeSQL(executionGroupContext, queryContext);
return jdbcExecutor.execute(executionGroupContext, callback);
} finally {
- processEngine.completeSQLExecution();
+
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
}
}
@@ -99,7 +99,7 @@ public final class DriverJDBCExecutor {
? accumulate(results)
: results.get(0);
} finally {
- processEngine.completeSQLExecution();
+
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
}
}
@@ -137,7 +137,7 @@ public final class DriverJDBCExecutor {
List<Boolean> results = doExecute(executionGroupContext,
queryContext.getSqlStatementContext(), routeUnits, callback);
return null != results && !results.isEmpty() && null !=
results.get(0) && results.get(0);
} finally {
- processEngine.completeSQLExecution();
+
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
}
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
index 9173d58b68c..94c18eccbef 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutor.java
@@ -29,7 +29,6 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorEx
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import
org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
@@ -65,11 +64,11 @@ public final class BatchPreparedStatementExecutor {
private final String databaseName;
- public BatchPreparedStatementExecutor(final MetaDataContexts
metaDataContexts, final JDBCExecutor jdbcExecutor, final String databaseName) {
+ public BatchPreparedStatementExecutor(final MetaDataContexts
metaDataContexts, final JDBCExecutor jdbcExecutor, final String databaseName,
final String processId) {
this.databaseName = databaseName;
this.metaDataContexts = metaDataContexts;
this.jdbcExecutor = jdbcExecutor;
- executionGroupContext = new ExecutionGroupContext<>(new
LinkedList<>(), new ExecutionGroupReportContext(ProcessIdContext.get(),
databaseName, new Grantee("", "")));
+ executionGroupContext = new ExecutionGroupContext<>(new
LinkedList<>(), new ExecutionGroupReportContext(processId, databaseName, new
Grantee("", "")));
batchExecutionUnits = new LinkedList<>();
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
index 1614bb0c08e..0d1b4a37456 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/adapter/AbstractStatementAdapter.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOper
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
@@ -49,8 +48,6 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
@Getter(AccessLevel.NONE)
private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new
ForceExecuteTemplate<>();
- private final ProcessEngine processEngine = new ProcessEngine();
-
private boolean poolable;
private int fetchSize;
@@ -93,8 +90,6 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
protected abstract StatementManager getStatementManager();
- protected abstract String getProcessId();
-
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public final void setPoolable(final boolean poolable) throws SQLException {
@@ -234,7 +229,6 @@ public abstract class AbstractStatementAdapter extends
AbstractUnsupportedOperat
}
} finally {
getRoutedStatements().clear();
- processEngine.disconnect(getProcessId());
}
}
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
index 68989e91bfa..532a3743029 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
@@ -24,6 +24,8 @@ import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePrepar
import
org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
import
org.apache.shardingsphere.driver.jdbc.exception.connection.ConnectionClosedException;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
+import org.apache.shardingsphere.infra.metadata.user.Grantee;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.transaction.api.TransactionType;
@@ -42,6 +44,8 @@ import java.sql.Statement;
*/
public final class ShardingSphereConnection extends AbstractConnectionAdapter {
+ private final ProcessEngine processEngine = new ProcessEngine();
+
@Getter
private final String databaseName;
@@ -51,6 +55,9 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
@Getter
private final DriverDatabaseConnectionManager databaseConnectionManager;
+ @Getter
+ private final String processId;
+
private boolean autoCommit = true;
private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
@@ -63,6 +70,7 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
this.databaseName = databaseName;
this.contextManager = contextManager;
databaseConnectionManager = new
DriverDatabaseConnectionManager(databaseName, contextManager);
+ processId = processEngine.connect(new Grantee("", ""), databaseName);
}
/**
@@ -301,6 +309,7 @@ public final class ShardingSphereConnection extends
AbstractConnectionAdapter {
public void close() throws SQLException {
closed = true;
databaseConnectionManager.close();
+ processEngine.disconnect(processId);
}
private ConnectionContext getConnectionContext() {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 5565a09b206..6f289b7aced 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -149,9 +149,6 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
@Getter(AccessLevel.PROTECTED)
private final StatementManager statementManager;
- @Getter
- private final String processId;
-
@Getter
private final boolean selectContainsEnhancedTable;
@@ -212,13 +209,12 @@ public final class ShardingSpherePreparedStatement
extends AbstractPreparedState
statementOption = returnGeneratedKeys ? new StatementOption(true,
columns) : new StatementOption(resultSetType, resultSetConcurrency,
resultSetHoldability);
executor = new DriverExecutor(connection);
JDBCExecutor jdbcExecutor = new
JDBCExecutor(connection.getContextManager().getExecutorEngine(),
connection.getDatabaseConnectionManager().getConnectionContext());
- batchPreparedStatementExecutor = new
BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, databaseName);
+ batchPreparedStatementExecutor = new
BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, databaseName,
connection.getProcessId());
kernelProcessor = new KernelProcessor();
statementsCacheable =
isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData());
trafficRule =
metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class);
selectContainsEnhancedTable = sqlStatementContext instanceof
SelectStatementContext && ((SelectStatementContext)
sqlStatementContext).isContainsEnhancedTable();
statementManager = new StatementManager();
- processId = getProcessEngine().connect(new Grantee("", ""),
databaseName);
}
private boolean isStatementsCacheable(final RuleMetaData
databaseRuleMetaData) {
@@ -284,7 +280,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
ExecutionGroupContext<JDBCExecutionUnit> context =
- prepareEngine.prepare(new RouteContext(),
Collections.singleton(executionUnit), new
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
+ prepareEngine.prepare(new RouteContext(),
Collections.singleton(executionUnit), new
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new
Grantee("", "")));
if (context.getInputGroups().isEmpty() ||
context.getInputGroups().iterator().next().getInputs().isEmpty()) {
throw new EmptyTrafficExecutionUnitException();
}
@@ -331,7 +327,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private ResultSet executeFederationQuery(final QueryContext queryContext) {
PreparedStatementExecuteQueryCallback callback = new
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, queryContext,
metaDataContexts.getMetaData());
+ SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, queryContext,
metaDataContexts.getMetaData(), connection.getProcessId());
return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(),
callback, context);
}
@@ -456,7 +452,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
+ .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new
Grantee("", "")));
}
private boolean executeWithExecutionContext(final ExecutionContext
executionContext) throws SQLException {
@@ -532,7 +528,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(processId, databaseName, new
Grantee("", "")));
+ new ExecutionGroupReportContext(connection.getProcessId(),
databaseName, new Grantee("", "")));
}
@Override
@@ -724,7 +720,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
ExecutionUnit executionUnit = each.getExecutionUnit();
executionUnits.add(executionUnit);
}
-
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(),
executionUnits, new ExecutionGroupReportContext(processId, databaseName, new
Grantee("", ""))));
+
batchExecutor.init(prepareEngine.prepare(executionContext.getRouteContext(),
executionUnits, new ExecutionGroupReportContext(connection.getProcessId(),
databaseName, new Grantee("", ""))));
setBatchParametersForStatements(batchExecutor);
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index 7208285bb55..83de9d7a266 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -121,9 +121,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
@Getter(AccessLevel.PROTECTED)
private final StatementManager statementManager;
- @Getter
- private final String processId;
-
private final BatchStatementExecutor batchStatementExecutor;
private boolean returnGeneratedKeys;
@@ -157,7 +154,6 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
statementManager = new StatementManager();
batchStatementExecutor = new BatchStatementExecutor(this);
databaseName = connection.getDatabaseName();
- processId = getProcessEngine().connect(new Grantee("", ""),
databaseName);
}
@Override
@@ -242,7 +238,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, queryContext,
metaDataContexts.getMetaData());
+ SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, queryContext,
metaDataContexts.getMetaData(), connection.getProcessId());
return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(),
callback, context);
}
@@ -481,7 +477,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new
SQLUnit(queryContext.getSql(), queryContext.getParameters()));
ExecutionGroupContext<JDBCExecutionUnit> context =
- prepareEngine.prepare(new RouteContext(),
Collections.singletonList(executionUnit), new
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
+ prepareEngine.prepare(new RouteContext(),
Collections.singletonList(executionUnit), new
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new
Grantee("", "")));
return context.getInputGroups().stream().flatMap(each ->
each.getInputs().stream()).findFirst().orElseThrow(EmptyTrafficExecutionUnitException::new);
}
@@ -528,13 +524,13 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
private ExecutionGroupContext<JDBCExecutionUnit>
createExecutionGroupContext(final ExecutionContext executionContext) throws
SQLException {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine();
return prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(),
- new ExecutionGroupReportContext(processId, databaseName, new
Grantee("", "")));
+ new ExecutionGroupReportContext(connection.getProcessId(),
databaseName, new Grantee("", "")));
}
private ExecutionGroupContext<RawSQLExecutionUnit>
createRawExecutionContext(final ExecutionContext executionContext) throws
SQLException {
int maxConnectionsSizePerQuery =
metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
return new RawExecutionPrepareEngine(maxConnectionsSizePerQuery,
metaDataContexts.getMetaData().getDatabase(databaseName).getRuleMetaData().getRules())
- .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(processId, databaseName, new Grantee("", "")));
+ .prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits(), new
ExecutionGroupReportContext(connection.getProcessId(), databaseName, new
Grantee("", "")));
}
private boolean executeWithExecutionContext(final ExecuteCallback
executeCallback, final ExecutionContext executionContext) throws SQLException {
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
index e179f53f3b2..ac096d5e97a 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/state/circuit/statement/CircuitBreakerPreparedStatement.java
@@ -284,11 +284,6 @@ public final class CircuitBreakerPreparedStatement extends
AbstractUnsupportedOp
return null;
}
- @Override
- protected String getProcessId() {
- return null;
- }
-
@Override
public ResultSet executeQuery() {
return new CircuitBreakerResultSet();
diff --git
a/jdbc/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
b/jdbc/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
index 6c8cb0acb12..23a6fc48c01 100644
---
a/jdbc/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
+++
b/jdbc/src/test/java/org/apache/shardingsphere/driver/executor/batch/BatchPreparedStatementExecutorTest.java
@@ -88,8 +88,9 @@ class BatchPreparedStatementExecutorTest {
void setUp() {
SQLExecutorExceptionHandler.setExceptionThrown(true);
ShardingSphereConnection connection = new
ShardingSphereConnection("foo_db", mockContextManager());
- executor = new BatchPreparedStatementExecutor(
- connection.getContextManager().getMetaDataContexts(), new
JDBCExecutor(executorEngine,
connection.getDatabaseConnectionManager().getConnectionContext()), "foo_db");
+ String processId = new UUID(ThreadLocalRandom.current().nextLong(),
ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
+ executor = new
BatchPreparedStatementExecutor(connection.getContextManager().getMetaDataContexts(),
+ new JDBCExecutor(executorEngine,
connection.getDatabaseConnectionManager().getConnectionContext()), "foo_db",
processId);
when(sqlStatementContext.getTablesContext()).thenReturn(mock(TablesContext.class));
}
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
index cb3557269d8..247cccfa59e 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
@@ -40,4 +40,6 @@ public final class SQLFederationExecutorContext {
private final QueryContext queryContext;
private final ShardingSphereMetaData metaData;
+
+ private final String processId;
}
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
index 6bb47cac627..01fd2362172 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
@@ -41,7 +41,6 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
-import org.apache.shardingsphere.infra.executor.sql.process.ProcessIdContext;
import org.apache.shardingsphere.infra.hint.HintValueContext;
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
@@ -142,7 +141,7 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
try {
return createEnumerable(queryContext, database, context);
} finally {
- processEngine.completeSQLExecution();
+
processEngine.completeSQLExecution(federationContext.getProcessId());
}
}
@@ -155,7 +154,7 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
computeConnectionOffsets(context);
// TODO pass grantee from proxy and jdbc adapter
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext
= prepareEngine.prepare(context.getRouteContext(),
executorContext.getConnectionOffsets(), context.getExecutionUnits(),
- new
ExecutionGroupReportContext(ProcessIdContext.get(), database.getName(), new
Grantee("", "")));
+ new
ExecutionGroupReportContext(executorContext.getFederationContext().getProcessId(),
database.getName(), new Grantee("", "")));
setParameters(executionGroupContext.getInputGroups());
processEngine.executeSQL(executionGroupContext,
context.getQueryContext());
List<QueryResult> queryResults =
jdbcExecutor.execute(executionGroupContext,
callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
index 7bfa9e62012..ab30083ad4b 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
@@ -231,7 +231,7 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
ProxyJDBCExecutorCallback callback =
ProxyJDBCExecutorCallbackFactory.newInstance(driverType, protocolType,
database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), this,
isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaDataContexts);
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, queryContext,
metaDataContexts.getMetaData());
+ SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, queryContext,
metaDataContexts.getMetaData(),
databaseConnectionManager.getConnectionSession().getProcessId());
return
proxySQLExecutor.getSqlFederationEngine().executeQuery(prepareEngine, callback,
context);
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
index 27ab4572437..e8800671f90 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/executor/ProxyJDBCExecutor.java
@@ -78,7 +78,7 @@ public final class ProxyJDBCExecutor {
isExceptionThrown,
false));
} finally {
- processEngine.completeSQLExecution();
+
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
}
}
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
index 8f450a881ea..d33f7c8fc6c 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorConnection
import
org.apache.shardingsphere.distsql.handler.aware.DistSQLExecutorDatabaseAware;
import
org.apache.shardingsphere.distsql.handler.engine.DistSQLConnectionContext;
import
org.apache.shardingsphere.distsql.handler.engine.query.DistSQLQueryExecutor;
+import
org.apache.shardingsphere.distsql.handler.exception.rule.RuleNotExistedException;
import org.apache.shardingsphere.distsql.statement.rul.sql.PreviewStatement;
import
org.apache.shardingsphere.infra.binder.context.aware.CursorDefinitionAware;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
@@ -54,8 +55,8 @@ import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
+import
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
-import
org.apache.shardingsphere.distsql.handler.exception.rule.RuleNotExistedException;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
@@ -129,7 +130,8 @@ public final class PreviewExecutor implements
DistSQLQueryExecutor<PreviewStatem
// TODO move dialect MySQLInsertStatement into database type module
@zhangliang
boolean isReturnGeneratedKeys = sqlStatement instanceof
MySQLInsertStatement;
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaData.getProps());
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(true, queryContext, metaData);
+ SQLFederationExecutorContext context = new
SQLFederationExecutorContext(true, queryContext, metaData,
+ ((ProxyDatabaseConnectionManager)
connectionContext.getDatabaseConnectionManager()).getConnectionSession().getProcessId());
federationEngine.executeQuery(prepareEngine,
createPreviewCallback(sqlStatement), context);
return context.getPreviewExecutionUnits();
}
diff --git
a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
index fc074fb2788..b168a20ca09 100644
---
a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
+++
b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
@@ -85,7 +85,8 @@ public final class OpenGaussSystemCatalogAdminQueryExecutor
implements DatabaseA
JDBCExecutor jdbcExecutor = new
JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(),
connectionSession.getConnectionContext());
try (SQLFederationEngine sqlFederationEngine = new
SQLFederationEngine(databaseName, PG_CATALOG, metaDataContexts.getMetaData(),
metaDataContexts.getStatistics(), jdbcExecutor)) {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(metaDataContexts,
connectionSession);
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, new QueryContext(sqlStatementContext, sql,
parameters), metaDataContexts.getMetaData());
+ SQLFederationExecutorContext context =
+ new SQLFederationExecutorContext(false, new
QueryContext(sqlStatementContext, sql, parameters),
metaDataContexts.getMetaData(), connectionSession.getProcessId());
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
ResultSet resultSet =
sqlFederationEngine.executeQuery(prepareEngine,
createOpenGaussSystemCatalogAdminQueryCallback(database.getProtocolType(),
database.getResourceMetaData(), sqlStatementContext.getSqlStatement()),
context);