This is an automated email from the ASF dual-hosted git repository.
zhangyonglun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1bb0aed Decouple QueryResponse and QueryResult (#8425)
1bb0aed is described below
commit 1bb0aedcfce6e6a7c66622739ca4d07836508df0
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 30 21:23:06 2020 +0800
Decouple QueryResponse and QueryResult (#8425)
* Refactor ShowTablesBackendHandler
* Refactor JDBCDatabaseCommunicationEngine
* Move QueryHeader creation to JDBCDatabaseCommunicationEngine
* Refactor JDBCDatabaseCommunicationEngine
* Refactor JDBCDatabaseCommunicationEngine
* Remove useless QueryResponse.queryResults
* Remove useless QueryResponse.queryResults
---
.../backend/communication/ProxySQLExecutor.java | 72 ++-----------------
.../jdbc/JDBCDatabaseCommunicationEngine.java | 80 +++++++++++++++++-----
.../backend/response/query/QueryResponse.java | 4 --
.../text/admin/ShowTablesBackendHandler.java | 1 -
4 files changed, 67 insertions(+), 90 deletions(-)
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
index 816ec91..29e00b7 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/ProxySQLExecutor.java
@@ -17,8 +17,6 @@
package org.apache.shardingsphere.proxy.backend.communication;
-import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
-import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
@@ -29,11 +27,9 @@ import
org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.J
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.RawSQLExecutionUnit;
import
org.apache.shardingsphere.infra.executor.sql.execute.engine.raw.callback.RawSQLExecutorCallback;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.DriverExecutionPrepareEngine;
import
org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.StatementOption;
import
org.apache.shardingsphere.infra.executor.sql.prepare.raw.RawExecutionPrepareEngine;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.type.RawExecutionRule;
import
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
@@ -43,23 +39,14 @@ import
org.apache.shardingsphere.proxy.backend.communication.raw.ProxyRawExecuto
import org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import
org.apache.shardingsphere.proxy.backend.exception.TableModifyInTransactionException;
-import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
-import org.apache.shardingsphere.proxy.backend.response.query.QueryHeader;
-import
org.apache.shardingsphere.proxy.backend.response.query.QueryHeaderBuilder;
-import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
-import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.ddl.DDLStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
-import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
import org.apache.shardingsphere.transaction.core.TransactionType;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
/**
* Proxy SQL Executor.
@@ -103,22 +90,16 @@ public final class ProxySQLExecutor {
* Execute SQL.
*
* @param executionContext execution context
- * @return execute response
+ * @return execute results
* @throws SQLException SQL exception
*/
- public BackendResponse execute(final ExecutionContext executionContext)
throws SQLException {
- Collection<ExecuteResult> executeResults = execute(executionContext,
- executionContext.getSqlStatementContext().getSqlStatement()
instanceof InsertStatement, SQLExecutorExceptionHandler.isExceptionThrown());
- ExecuteResult executeResultSample = executeResults.iterator().next();
- return executeResultSample instanceof QueryResult
- ? processExecuteQuery(executionContext, executeResults,
(QueryResult) executeResultSample) : processExecuteUpdate(executionContext,
executeResults);
- }
-
- private Collection<ExecuteResult> execute(final ExecutionContext
executionContext, final boolean isReturnGeneratedKeys, final boolean
isExceptionThrown) throws SQLException {
- int maxConnectionsSizePerQuery =
ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ public Collection<ExecuteResult> execute(final ExecutionContext
executionContext) throws SQLException {
Collection<ShardingSphereRule> rules =
ProxyContext.getInstance().getMetaDataContexts().getMetaDataMap().get(backendConnection.getSchemaName()).getRuleMetaData().getRules();
+ int maxConnectionsSizePerQuery =
ProxyContext.getInstance().getMetaDataContexts().getProps().<Integer>getValue(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY);
+ boolean isReturnGeneratedKeys =
executionContext.getSqlStatementContext().getSqlStatement() instanceof
InsertStatement;
return rules.stream().anyMatch(each -> each instanceof
RawExecutionRule)
- ? rawExecute(executionContext, rules,
maxConnectionsSizePerQuery) : useDriverToExecute(executionContext, rules,
maxConnectionsSizePerQuery, isReturnGeneratedKeys, isExceptionThrown);
+ ? rawExecute(executionContext, rules,
maxConnectionsSizePerQuery)
+ : useDriverToExecute(executionContext, rules,
maxConnectionsSizePerQuery, isReturnGeneratedKeys,
SQLExecutorExceptionHandler.isExceptionThrown());
}
private Collection<ExecuteResult> rawExecute(final ExecutionContext
executionContext, final Collection<ShardingSphereRule> rules, final int
maxConnectionsSizePerQuery) throws SQLException {
@@ -135,45 +116,4 @@ public final class ProxySQLExecutor {
Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups =
prepareEngine.prepare(executionContext.getRouteContext(),
executionContext.getExecutionUnits());
return jdbcExecutor.execute(executionGroups, isExceptionThrown,
isReturnGeneratedKeys);
}
-
- private BackendResponse processExecuteQuery(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults, final
QueryResult executeResultSample) throws SQLException {
- ShardingSphereMetaData metaData =
ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName());
- int columnCount = executeResultSample.getMetaData().getColumnCount();
- List<QueryHeader> queryHeaders = new ArrayList<>(columnCount);
- for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
- queryHeaders.add(createQueryHeader(executionContext,
executeResultSample, metaData, columnIndex));
- }
- return createQueryResponse(queryHeaders, executeResults);
- }
-
- private QueryHeader createQueryHeader(final ExecutionContext
executionContext,
- final QueryResult
executeResultSample, final ShardingSphereMetaData metaData, final int
columnIndex) throws SQLException {
- return
hasSelectExpandProjections(executionContext.getSqlStatementContext())
- ? QueryHeaderBuilder.build(((SelectStatementContext)
executionContext.getSqlStatementContext()).getProjectionsContext(),
executeResultSample, metaData, columnIndex)
- : QueryHeaderBuilder.build(executeResultSample, metaData,
columnIndex);
- }
-
- private boolean hasSelectExpandProjections(final SQLStatementContext<?>
sqlStatementContext) {
- return sqlStatementContext instanceof SelectStatementContext &&
!((SelectStatementContext)
sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
- }
-
- private BackendResponse createQueryResponse(final List<QueryHeader>
queryHeaders, final Collection<ExecuteResult> executeResults) {
- QueryResponse result = new QueryResponse(queryHeaders);
- for (ExecuteResult each : executeResults) {
- result.getQueryResults().add((QueryResult) each);
- }
- return result;
- }
-
- private UpdateResponse processExecuteUpdate(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults) {
- UpdateResponse result = new UpdateResponse(executeResults);
- if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof InsertStatement) {
- result.setType("INSERT");
- } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof DeleteStatement) {
- result.setType("DELETE");
- } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof UpdateStatement) {
- result.setType("UPDATE");
- }
- return result;
- }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
index 751c2d9..038de73 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/JDBCDatabaseCommunicationEngine.java
@@ -20,9 +20,11 @@ package
org.apache.shardingsphere.proxy.backend.communication.jdbc;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import
org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.context.kernel.KernelProcessor;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
+import
org.apache.shardingsphere.infra.executor.sql.execute.result.ExecuteResult;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.infra.executor.sql.log.SQLLogger;
import org.apache.shardingsphere.infra.merge.MergeEngine;
@@ -41,9 +43,13 @@ import
org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
import org.apache.shardingsphere.proxy.backend.response.query.QueryData;
import org.apache.shardingsphere.proxy.backend.response.query.QueryHeader;
+import
org.apache.shardingsphere.proxy.backend.response.query.QueryHeaderBuilder;
import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse;
import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DeleteStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.InsertStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.UpdateStatement;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -67,7 +73,7 @@ public final class JDBCDatabaseCommunicationEngine implements
DatabaseCommunicat
private final KernelProcessor kernelProcessor = new KernelProcessor();
- private BackendResponse response;
+ private List<QueryHeader> queryHeaders;
private MergedResult mergedResult;
@@ -89,10 +95,55 @@ public final class JDBCDatabaseCommunicationEngine
implements DatabaseCommunicat
return new UpdateResponse();
}
proxySQLExecutor.checkExecutePrerequisites(executionContext);
- response = proxySQLExecutor.execute(executionContext);
+ Collection<ExecuteResult> executeResults =
proxySQLExecutor.execute(executionContext);
+ ExecuteResult executeResultSample = executeResults.iterator().next();
+ return executeResultSample instanceof QueryResult
+ ? processExecuteQuery(executionContext, executeResults,
(QueryResult) executeResultSample) : processExecuteUpdate(executionContext,
executeResults);
+ }
+
+ private QueryResponse processExecuteQuery(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults, final
QueryResult executeResultSample) throws SQLException {
+ queryHeaders = createQueryHeaders(executionContext,
executeResultSample);
+ mergedResult = mergeQuery(executionContext.getSqlStatementContext(),
executeResults.stream().map(each -> (QueryResult)
each).collect(Collectors.toList()));
+ return new QueryResponse(queryHeaders);
+ }
+
+ private List<QueryHeader> createQueryHeaders(final ExecutionContext
executionContext, final QueryResult executeResultSample) throws SQLException {
+ int columnCount = executeResultSample.getMetaData().getColumnCount();
+ List<QueryHeader> result = new ArrayList<>(columnCount);
+ for (int columnIndex = 1; columnIndex <= columnCount; columnIndex++) {
+ result.add(createQueryHeader(executionContext,
executeResultSample, metaData, columnIndex));
+ }
+ return result;
+ }
+
+ private QueryHeader createQueryHeader(final ExecutionContext
executionContext,
+ final QueryResult
executeResultSample, final ShardingSphereMetaData metaData, final int
columnIndex) throws SQLException {
+ return
hasSelectExpandProjections(executionContext.getSqlStatementContext())
+ ? QueryHeaderBuilder.build(((SelectStatementContext)
executionContext.getSqlStatementContext()).getProjectionsContext(),
executeResultSample, metaData, columnIndex)
+ : QueryHeaderBuilder.build(executeResultSample, metaData,
columnIndex);
+ }
+
+ private boolean hasSelectExpandProjections(final SQLStatementContext<?>
sqlStatementContext) {
+ return sqlStatementContext instanceof SelectStatementContext &&
!((SelectStatementContext)
sqlStatementContext).getProjectionsContext().getExpandProjections().isEmpty();
+ }
+
+ private UpdateResponse processExecuteUpdate(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults) throws
SQLException {
+ UpdateResponse result = createUpdateResponse(executionContext,
executeResults);
refreshSchema(executionContext);
- merge(executionContext.getSqlStatementContext());
- return response;
+ mergeUpdateCount(executionContext.getSqlStatementContext(), result);
+ return result;
+ }
+
+ private UpdateResponse createUpdateResponse(final ExecutionContext
executionContext, final Collection<ExecuteResult> executeResults) {
+ UpdateResponse result = new UpdateResponse(executeResults);
+ if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof InsertStatement) {
+ result.setType("INSERT");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof DeleteStatement) {
+ result.setType("DELETE");
+ } else if (executionContext.getSqlStatementContext().getSqlStatement()
instanceof UpdateStatement) {
+ result.setType("UPDATE");
+ }
+ return result;
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -115,17 +166,15 @@ public final class JDBCDatabaseCommunicationEngine
implements DatabaseCommunicat
OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema),
SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName,
schema));
}
- private void merge(final SQLStatementContext<?> sqlStatementContext)
throws SQLException {
- if (response instanceof UpdateResponse) {
- mergeUpdateCount(sqlStatementContext);
- return;
- }
- mergedResult = mergeQuery(sqlStatementContext, ((QueryResponse)
response).getQueryResults());
+ private MergedResult mergeQuery(final SQLStatementContext<?>
sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
+ MergeEngine mergeEngine = new
MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
+ metaData.getSchema(),
ProxyContext.getInstance().getMetaDataContexts().getProps(),
metaData.getRuleMetaData().getRules());
+ return mergeEngine.merge(queryResults, sqlStatementContext);
}
- private void mergeUpdateCount(final SQLStatementContext<?>
sqlStatementContext) {
+ private void mergeUpdateCount(final SQLStatementContext<?>
sqlStatementContext, final UpdateResponse response) {
if (isNeedAccumulate(sqlStatementContext)) {
- ((UpdateResponse) response).mergeUpdateCount();
+ response.mergeUpdateCount();
}
}
@@ -135,12 +184,6 @@ public final class JDBCDatabaseCommunicationEngine
implements DatabaseCommunicat
return dataNodeContainedRule.isPresent() &&
dataNodeContainedRule.get().isNeedAccumulate(sqlStatementContext.getTablesContext().getTableNames());
}
- private MergedResult mergeQuery(final SQLStatementContext<?>
sqlStatementContext, final List<QueryResult> queryResults) throws SQLException {
- MergeEngine mergeEngine = new
MergeEngine(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
- metaData.getSchema(),
ProxyContext.getInstance().getMetaDataContexts().getProps(),
metaData.getRuleMetaData().getRules());
- return mergeEngine.merge(queryResults, sqlStatementContext);
- }
-
@Override
public boolean next() throws SQLException {
return null != mergedResult && mergedResult.next();
@@ -148,7 +191,6 @@ public final class JDBCDatabaseCommunicationEngine
implements DatabaseCommunicat
@Override
public QueryData getQueryData() throws SQLException {
- List<QueryHeader> queryHeaders = ((QueryResponse)
response).getQueryHeaders();
List<Object> row = new ArrayList<>(queryHeaders.size());
for (int columnIndex = 1; columnIndex <= queryHeaders.size();
columnIndex++) {
row.add(mergedResult.getValue(columnIndex, Object.class));
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
index 5d3eb1f..63acc61 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/response/query/QueryResponse.java
@@ -19,10 +19,8 @@ package
org.apache.shardingsphere.proxy.backend.response.query;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
import org.apache.shardingsphere.proxy.backend.response.BackendResponse;
-import java.util.LinkedList;
import java.util.List;
/**
@@ -33,6 +31,4 @@ import java.util.List;
public final class QueryResponse implements BackendResponse {
private final List<QueryHeader> queryHeaders;
-
- private final List<QueryResult> queryResults = new LinkedList<>();
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
index 1aa0001..0bbc38a 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/admin/ShowTablesBackendHandler.java
@@ -59,7 +59,6 @@ public final class ShowTablesBackendHandler implements
TextProtocolBackendHandle
Collection<String> allTableNames =
ProxyContext.getInstance().getMetaData(backendConnection.getSchemaName()).getSchema().getAllTableNames();
List<MemoryQueryResultDataRow> rows = allTableNames.stream().map(each
-> new
MemoryQueryResultDataRow(Collections.singletonList(each))).collect(Collectors.toList());
queryResult = new RawMemoryQueryResult(metaData, rows);
- result.getQueryResults().add(queryResult);
return result;
}