This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9dcf095ecba Refactor sql federation executor logic for code style
(#30561)
9dcf095ecba is described below
commit 9dcf095ecba5ec29cc2e4924bef14a3ff4b17575
Author: Zhengqiang Duan <[email protected]>
AuthorDate: Wed Mar 20 18:27:09 2024 +0800
Refactor sql federation executor logic for code style (#30561)
* Bind oracle merge source subquery table projection parameters
* Refactor sql federation executor logic
* fix unit test
* rename method name
---
.../statement/ShardingSpherePreparedStatement.java | 4 +-
.../core/statement/ShardingSphereStatement.java | 4 +-
.../sqlfederation/engine/SQLFederationEngine.java | 23 +++--
.../EnumerableConstants.java} | 34 ++++---
.../SQLFederationBindContext.java} | 6 +-
.../SQLFederationContext.java} | 8 +-
.../SQLFederationExecutorContext.java} | 8 +-
.../enumerable/EnumerableScanExecutor.java | 113 +++++----------------
.../JDBCRowEnumerator.java} | 6 +-
.../executor/enumerator/MemoryRowEnumerator.java | 75 ++++++++++++++
.../EnumeratorUtils.java} | 85 +++++-----------
.../executor/utils/StatisticsAssembleUtils.java | 91 +++++++++++++++++
.../enumerable/EnumerableScanExecutorTest.java | 6 +-
.../metadata/schema/SQLFederationTable.java | 4 +-
.../proxy/backend/connector/DatabaseConnector.java | 4 +-
.../handler/distsql/rul/PreviewExecutor.java | 4 +-
.../OpenGaussSystemCatalogAdminQueryExecutor.java | 6 +-
.../cases/dql/dql-integration-select-combine.xml | 10 +-
...gration-select-pagination-group-by-order-by.xml | 5 +-
19 files changed, 284 insertions(+), 212 deletions(-)
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
index 7c33d2b42ce..065bcd89979 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
@@ -84,7 +84,7 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
-import
org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import
org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.rule.TrafficRule;
@@ -328,7 +328,7 @@ public final class ShardingSpherePreparedStatement extends
AbstractPreparedState
private ResultSet executeFederationQuery(final QueryContext queryContext) {
PreparedStatementExecuteQueryCallback callback = new
PreparedStatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
sqlStatement, SQLExecutorExceptionHandler.isExceptionThrown());
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, queryContext,
metaDataContexts.getMetaData(), connection.getProcessId());
+ SQLFederationContext context = new SQLFederationContext(false,
queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(),
callback, context);
}
diff --git
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
index d0a126f646c..5d4c85bb53f 100644
---
a/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
+++
b/jdbc/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java
@@ -78,7 +78,7 @@ import
org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement;
-import
org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.traffic.engine.TrafficEngine;
import
org.apache.shardingsphere.traffic.exception.metadata.EmptyTrafficExecutionUnitException;
import org.apache.shardingsphere.traffic.executor.TrafficExecutorCallback;
@@ -240,7 +240,7 @@ public final class ShardingSphereStatement extends
AbstractStatementAdapter {
StatementExecuteQueryCallback callback = new
StatementExecuteQueryCallback(metaDataContexts.getMetaData().getDatabase(databaseName).getProtocolType(),
metaDataContexts.getMetaData().getDatabase(databaseName).getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(),
SQLExecutorExceptionHandler.isExceptionThrown());
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, queryContext,
metaDataContexts.getMetaData(), connection.getProcessId());
+ SQLFederationContext context = new SQLFederationContext(false,
queryContext, metaDataContexts.getMetaData(), connection.getProcessId());
return
executor.getSqlFederationEngine().executeQuery(createDriverExecutionPrepareEngine(),
callback, context);
}
diff --git
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
index fb88e6d43fb..50db67ca180 100644
---
a/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
+++
b/kernel/sql-federation/core/src/main/java/org/apache/shardingsphere/sqlfederation/engine/SQLFederationEngine.java
@@ -45,9 +45,9 @@ import
org.apache.shardingsphere.infra.metadata.database.schema.util.SystemSchem
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
-import
org.apache.shardingsphere.sqlfederation.executor.SQLFederationDataContext;
-import
org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
-import
org.apache.shardingsphere.sqlfederation.executor.TableScanExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationBindContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
import
org.apache.shardingsphere.sqlfederation.executor.enumerable.EnumerableScanExecutor;
import
org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationCompilerEngine;
import
org.apache.shardingsphere.sqlfederation.optimizer.SQLFederationExecutionPlan;
@@ -161,7 +161,7 @@ public final class SQLFederationEngine implements
AutoCloseable {
* @throws SQLFederationUnsupportedSQLException SQL federation unsupported
SQL exception
*/
public ResultSet executeQuery(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutorCallback<? extends
ExecuteResult> callback, final SQLFederationExecutorContext federationContext) {
+ final JDBCExecutorCallback<? extends
ExecuteResult> callback, final SQLFederationContext federationContext) {
try {
String databaseName =
federationContext.getQueryContext().getDatabaseNameFromSQLStatement().orElse(this.databaseName);
String schemaName =
federationContext.getQueryContext().getSchemaNameFromSQLStatement().orElse(this.schemaName);
@@ -176,7 +176,7 @@ public final class SQLFederationEngine implements
AutoCloseable {
}
private SQLFederationExecutionPlan compileQuery(final
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final
JDBCExecutorCallback<? extends ExecuteResult> callback, final
SQLFederationExecutorContext federationContext, final String databaseName,
+ final
JDBCExecutorCallback<? extends ExecuteResult> callback, final
SQLFederationContext federationContext, final String databaseName,
final String schemaName) {
SQLStatementContext sqlStatementContext =
federationContext.getQueryContext().getSqlStatementContext();
ShardingSpherePreconditions.checkState(sqlStatementContext instanceof
SelectStatementContext, () -> new IllegalArgumentException("SQL statement
context must be select statement context."));
@@ -191,12 +191,12 @@ public final class SQLFederationEngine implements
AutoCloseable {
}
@SuppressWarnings("unchecked")
- private ResultSet executePlan(final SQLFederationExecutorContext
federationContext, final SQLFederationExecutionPlan executionPlan, final String
databaseName, final String schemaName) {
+ private ResultSet executePlan(final SQLFederationContext
federationContext, final SQLFederationExecutionPlan executionPlan, final String
databaseName, final String schemaName) {
try {
Bindable<Object> executablePlan =
EnumerableInterpretable.toBindable(Collections.emptyMap(), null,
(EnumerableRel) executionPlan.getPhysicalPlan(), EnumerableRel.Prefer.ARRAY);
Map<String, Object> params =
createParameters(federationContext.getQueryContext().getParameters());
OptimizerPlannerContext plannerContext =
sqlFederationRule.getOptimizerContext().getPlannerContext(databaseName);
- Enumerator<Object> enumerator = executablePlan.bind(new
SQLFederationDataContext(plannerContext.getValidator(schemaName),
plannerContext.getConverter(schemaName), params)).enumerator();
+ Enumerator<Object> enumerator = executablePlan.bind(new
SQLFederationBindContext(plannerContext.getValidator(schemaName),
plannerContext.getConverter(schemaName), params)).enumerator();
ShardingSphereSchema schema =
federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
Schema sqlFederationSchema =
plannerContext.getValidator(schemaName).getCatalogReader().getRootSchema().plus().getSubSchema(schemaName);
return new SQLFederationResultSet(enumerator, schema,
sqlFederationSchema, (SelectStatementContext)
federationContext.getQueryContext().getSqlStatementContext(),
@@ -206,7 +206,7 @@ public final class SQLFederationEngine implements
AutoCloseable {
}
}
- private ExecutionPlanCacheKey buildCacheKey(final
SQLFederationExecutorContext federationContext, final SelectStatementContext
selectStatementContext,
+ private ExecutionPlanCacheKey buildCacheKey(final SQLFederationContext
federationContext, final SelectStatementContext selectStatementContext,
final SQLStatementCompiler
sqlStatementCompiler, final String databaseName, final String schemaName) {
ShardingSphereSchema schema =
federationContext.getMetaData().getDatabase(databaseName).getSchema(schemaName);
ExecutionPlanCacheKey result =
@@ -222,13 +222,14 @@ public final class SQLFederationEngine implements
AutoCloseable {
}
private void registerTableScanExecutor(final Schema sqlFederationSchema,
final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
- final JDBCExecutorCallback<?
extends ExecuteResult> callback, final SQLFederationExecutorContext
federationContext,
+ final JDBCExecutorCallback<?
extends ExecuteResult> callback, final SQLFederationContext federationContext,
final OptimizerContext
optimizerContext, final String databaseName, final String schemaName) {
if (null == sqlFederationSchema) {
return;
}
- TableScanExecutorContext executorContext = new
TableScanExecutorContext(databaseName, schemaName, metaData.getProps(),
federationContext);
- EnumerableScanExecutor scanExecutor = new
EnumerableScanExecutor(prepareEngine, jdbcExecutor, callback, optimizerContext,
metaData.getGlobalRuleMetaData(), executorContext, statistics);
+ SQLFederationExecutorContext executorContext = new
SQLFederationExecutorContext(databaseName, schemaName, metaData.getProps());
+ EnumerableScanExecutor scanExecutor =
+ new EnumerableScanExecutor(prepareEngine, jdbcExecutor,
callback, optimizerContext, executorContext, federationContext,
metaData.getGlobalRuleMetaData(), statistics);
// TODO register only the required tables
for (ShardingSphereTable each :
metaData.getDatabase(databaseName).getSchema(schemaName).getTables().values()) {
Table table = sqlFederationSchema.getTable(each.getName());
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/constant/EnumerableConstants.java
similarity index 52%
copy from
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
copy to
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/constant/EnumerableConstants.java
index 247cccfa59e..fec78da59fd 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/constant/EnumerableConstants.java
@@ -15,31 +15,33 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.sqlfederation.executor;
+package org.apache.shardingsphere.sqlfederation.executor.constant;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.session.query.QueryContext;
-import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import com.cedarsoftware.util.CaseInsensitiveSet;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import java.util.Collection;
-import java.util.LinkedList;
/**
- * SQL federation executor context.
+ * Enumerable constants.
*/
-@RequiredArgsConstructor
-@Getter
-public final class SQLFederationExecutorContext {
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class EnumerableConstants {
- private final Collection<ExecutionUnit> previewExecutionUnits = new
LinkedList<>();
+ public static final Collection<String> SYSTEM_CATALOG_TABLES = new
CaseInsensitiveSet<>(3, 1F);
- private final boolean preview;
+ public static final String DAT_COMPATIBILITY = "PG";
- private final QueryContext queryContext;
+ public static final String PG_DATABASE = "pg_database";
- private final ShardingSphereMetaData metaData;
+ public static final String PG_TABLES = "pg_tables";
- private final String processId;
+ public static final String PG_ROLES = "pg_roles";
+
+ static {
+ SYSTEM_CATALOG_TABLES.add(PG_DATABASE);
+ SYSTEM_CATALOG_TABLES.add(PG_TABLES);
+ SYSTEM_CATALOG_TABLES.add(PG_ROLES);
+ }
}
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationDataContext.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationBindContext.java
similarity index 92%
rename from
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationDataContext.java
rename to
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationBindContext.java
index 2467ac2cf22..718f30c0540 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationDataContext.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationBindContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.sqlfederation.executor;
+package org.apache.shardingsphere.sqlfederation.executor.context;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -29,10 +29,10 @@ import org.apache.calcite.sql2rel.SqlToRelConverter;
import java.util.Map;
/**
- * SQL federation data context.
+ * SQL federation bind context.
*/
@RequiredArgsConstructor
-public final class SQLFederationDataContext implements DataContext {
+public final class SQLFederationBindContext implements DataContext {
private final SqlValidator validator;
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationContext.java
similarity index 91%
rename from
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
rename to
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationContext.java
index 247cccfa59e..bce17720cdc 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/SQLFederationExecutorContext.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationContext.java
@@ -15,23 +15,23 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.sqlfederation.executor;
+package org.apache.shardingsphere.sqlfederation.executor.context;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import org.apache.shardingsphere.infra.session.query.QueryContext;
import java.util.Collection;
import java.util.LinkedList;
/**
- * SQL federation executor context.
+ * SQL federation context.
*/
@RequiredArgsConstructor
@Getter
-public final class SQLFederationExecutorContext {
+public final class SQLFederationContext {
private final Collection<ExecutionUnit> previewExecutionUnits = new
LinkedList<>();
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationExecutorContext.java
similarity index 86%
rename from
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java
rename to
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationExecutorContext.java
index 47df2132b41..8b24e5e4016 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/TableScanExecutorContext.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/context/SQLFederationExecutorContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.sqlfederation.executor;
+package org.apache.shardingsphere.sqlfederation.executor.context;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -25,11 +25,11 @@ import java.util.LinkedHashMap;
import java.util.Map;
/**
- * Table scan executor context.
+ * SQL federation executor context.
*/
@RequiredArgsConstructor
@Getter
-public final class TableScanExecutorContext {
+public final class SQLFederationExecutorContext {
private final String databaseName;
@@ -37,7 +37,5 @@ public final class TableScanExecutorContext {
private final ConfigurationProperties props;
- private final SQLFederationExecutorContext federationContext;
-
private final Map<String, Integer> connectionOffsets = new
LinkedHashMap<>();
}
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
index fde84dcd9dc..c2baacc045b 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutor.java
@@ -17,13 +17,11 @@
package org.apache.shardingsphere.sqlfederation.executor.enumerable;
-import com.cedarsoftware.util.CaseInsensitiveSet;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
-import org.apache.shardingsphere.authority.rule.AuthorityRule;
import
org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.engine.SQLBindEngine;
import org.apache.shardingsphere.infra.connection.kernel.KernelProcessor;
@@ -51,22 +49,21 @@ import
org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import org.apache.shardingsphere.infra.metadata.user.Grantee;
-import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
import org.apache.shardingsphere.infra.parser.sql.SQLStatementParserEngine;
import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
import org.apache.shardingsphere.infra.session.query.QueryContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
-import
org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
-import
org.apache.shardingsphere.sqlfederation.executor.TableScanExecutorContext;
-import org.apache.shardingsphere.sqlfederation.executor.row.MemoryEnumerator;
-import
org.apache.shardingsphere.sqlfederation.executor.row.SQLFederationRowEnumerator;
+import
org.apache.shardingsphere.sqlfederation.executor.constant.EnumerableConstants;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.enumerator.JDBCRowEnumerator;
+import
org.apache.shardingsphere.sqlfederation.executor.enumerator.MemoryRowEnumerator;
+import
org.apache.shardingsphere.sqlfederation.executor.utils.StatisticsAssembleUtils;
import
org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.EmptyRowEnumerator;
import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutor;
@@ -77,13 +74,10 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -93,16 +87,6 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor
public final class EnumerableScanExecutor implements ScanExecutor {
- private static final Collection<String> SYSTEM_CATALOG_TABLES = new
CaseInsensitiveSet<>(3, 1F);
-
- private static final String DAT_COMPATIBILITY = "PG";
-
- private static final String PG_DATABASE = "pg_database";
-
- private static final String PG_TABLES = "pg_tables";
-
- private static final String PG_ROLES = "pg_roles";
-
private final DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine;
private final JDBCExecutor jdbcExecutor;
@@ -111,29 +95,24 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
private final OptimizerContext optimizerContext;
- private final RuleMetaData globalRuleMetaData;
+ private final SQLFederationExecutorContext executorContext;
- private final TableScanExecutorContext executorContext;
+ private final SQLFederationContext federationContext;
+
+ private final RuleMetaData globalRuleMetaData;
private final ShardingSphereStatistics statistics;
private final ProcessEngine processEngine = new ProcessEngine();
- static {
- SYSTEM_CATALOG_TABLES.add(PG_DATABASE);
- SYSTEM_CATALOG_TABLES.add(PG_TABLES);
- SYSTEM_CATALOG_TABLES.add(PG_ROLES);
- }
-
@Override
public Enumerable<Object> execute(final ShardingSphereTable table, final
ScanExecutorContext scanContext) {
String databaseName = executorContext.getDatabaseName();
String schemaName = executorContext.getSchemaName();
DatabaseType databaseType =
optimizerContext.getParserContext(databaseName).getDatabaseType();
if (new
SystemDatabase(databaseType).getSystemSchemas().contains(schemaName)) {
- return executeByShardingSphereData(databaseName, schemaName,
table, databaseType);
+ return createMemoryEnumerable(databaseName, schemaName, table,
databaseType);
}
- SQLFederationExecutorContext federationContext =
executorContext.getFederationContext();
QueryContext queryContext =
createQueryContext(federationContext.getMetaData(), scanContext, databaseType,
federationContext.getQueryContext().isUseCache());
ShardingSphereDatabase database =
federationContext.getMetaData().getDatabase(databaseName);
ExecutionContext context = new
KernelProcessor().generateExecutionContext(queryContext, database,
globalRuleMetaData, executorContext.getProps(), new ConnectionContext());
@@ -141,10 +120,10 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
federationContext.getPreviewExecutionUnits().addAll(context.getExecutionUnits());
return createEmptyEnumerable();
}
- return createEnumerable(queryContext, database, context);
+ return createJDBCEnumerable(queryContext, database, context);
}
- private AbstractEnumerable<Object> createEnumerable(final QueryContext
queryContext, final ShardingSphereDatabase database, final ExecutionContext
context) {
+ private AbstractEnumerable<Object> createJDBCEnumerable(final QueryContext
queryContext, final ShardingSphereDatabase database, final ExecutionContext
context) {
return new AbstractEnumerable<Object>() {
@SneakyThrows
@@ -153,16 +132,16 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
computeConnectionOffsets(context);
// TODO pass grantee from proxy and jdbc adapter
ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext
= prepareEngine.prepare(context.getRouteContext(),
executorContext.getConnectionOffsets(), context.getExecutionUnits(),
- new
ExecutionGroupReportContext(executorContext.getFederationContext().getProcessId(),
database.getName(), new Grantee("", "")));
+ new
ExecutionGroupReportContext(federationContext.getProcessId(),
database.getName(), new Grantee("", "")));
setParameters(executionGroupContext.getInputGroups());
-
ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(executorContext.getFederationContext().getProcessId()).isInterrupted(),
+
ShardingSpherePreconditions.checkState(!ProcessRegistry.getInstance().get(federationContext.getProcessId()).isInterrupted(),
SQLExecutionInterruptedException::new);
- processEngine.executeSQL(executionGroupContext,
executorContext.getFederationContext().getQueryContext());
+ processEngine.executeSQL(executionGroupContext,
federationContext.getQueryContext());
List<QueryResult> queryResults =
jdbcExecutor.execute(executionGroupContext,
callback).stream().map(QueryResult.class::cast).collect(Collectors.toList());
MergeEngine mergeEngine = new MergeEngine(database,
executorContext.getProps(), new ConnectionContext());
MergedResult mergedResult = mergeEngine.merge(queryResults,
queryContext.getSqlStatementContext());
Collection<Statement> statements =
getStatements(executionGroupContext.getInputGroups());
- return new SQLFederationRowEnumerator(mergedResult,
queryResults.get(0).getMetaData(), statements);
+ return new JDBCRowEnumerator(mergedResult,
queryResults.get(0).getMetaData(), statements);
}
};
}
@@ -178,65 +157,21 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
}
}
- private Enumerable<Object> executeByShardingSphereData(final String
databaseName, final String schemaName, final ShardingSphereTable table, final
DatabaseType databaseType) {
- // TODO move this logic to ShardingSphere statistics
- if (databaseType instanceof OpenGaussDatabaseType &&
SYSTEM_CATALOG_TABLES.contains(table.getName())) {
- return createMemoryEnumerator(createSystemCatalogTableData(table),
table, databaseType);
+ private Enumerable<Object> createMemoryEnumerable(final String
databaseName, final String schemaName, final ShardingSphereTable table, final
DatabaseType databaseType) {
+ if (databaseType instanceof OpenGaussDatabaseType &&
EnumerableConstants.SYSTEM_CATALOG_TABLES.contains(table.getName())) {
+ return
createMemoryEnumerator(StatisticsAssembleUtils.assembleTableData(table,
federationContext.getMetaData()), table, databaseType);
}
- Optional<ShardingSphereTableData> tableData =
Optional.ofNullable(statistics.getDatabaseData().get(databaseName)).map(optional
-> optional.getSchemaData().get(schemaName))
-
.map(ShardingSphereSchemaData::getTableData).map(shardingSphereData ->
shardingSphereData.get(table.getName()));
+ Optional<ShardingSphereTableData> tableData =
Optional.ofNullable(statistics.getDatabaseData().get(databaseName))
+ .map(optional ->
optional.getSchemaData().get(schemaName)).map(ShardingSphereSchemaData::getTableData).map(optional
-> optional.get(table.getName()));
return tableData.map(optional -> createMemoryEnumerator(optional,
table, databaseType)).orElseGet(this::createEmptyEnumerable);
}
- private ShardingSphereTableData createSystemCatalogTableData(final
ShardingSphereTable table) {
- ShardingSphereTableData result = new
ShardingSphereTableData(table.getName());
- ShardingSphereMetaData metaData =
executorContext.getFederationContext().getMetaData();
- if (PG_DATABASE.equalsIgnoreCase(table.getName())) {
- appendOpenGaussDatabaseData(result,
metaData.getDatabases().values());
- } else if (PG_TABLES.equalsIgnoreCase(table.getName())) {
- for (ShardingSphereDatabase each :
metaData.getDatabases().values()) {
- appendOpenGaussTableData(result, each.getSchemas());
- }
- } else if (PG_ROLES.equalsIgnoreCase(table.getName())) {
- appendOpenGaussRoleData(result, metaData);
- }
- return result;
- }
-
- private void appendOpenGaussDatabaseData(final ShardingSphereTableData
tableData, final Collection<ShardingSphereDatabase> databases) {
- for (ShardingSphereDatabase each : databases) {
- Object[] rows = new Object[15];
- rows[0] = each.getName();
- rows[11] = DAT_COMPATIBILITY;
- tableData.getRows().add(new
ShardingSphereRowData(Arrays.asList(rows)));
- }
- }
-
- private void appendOpenGaussTableData(final ShardingSphereTableData
tableData, final Map<String, ShardingSphereSchema> schemas) {
- for (Entry<String, ShardingSphereSchema> entry : schemas.entrySet()) {
- for (String each : entry.getValue().getAllTableNames()) {
- Object[] rows = new Object[10];
- rows[0] = entry.getKey();
- rows[1] = each;
- tableData.getRows().add(new
ShardingSphereRowData(Arrays.asList(rows)));
- }
- }
- }
-
- private void appendOpenGaussRoleData(final ShardingSphereTableData
tableData, final ShardingSphereMetaData metaData) {
- for (ShardingSphereUser each :
metaData.getGlobalRuleMetaData().getSingleRule(AuthorityRule.class).getConfiguration().getUsers())
{
- Object[] rows = new Object[27];
- rows[0] = each.getGrantee().getUsername();
- tableData.getRows().add(new
ShardingSphereRowData(Arrays.asList(rows)));
- }
- }
-
private Enumerable<Object> createMemoryEnumerator(final
ShardingSphereTableData tableData, final ShardingSphereTable table, final
DatabaseType databaseType) {
return new AbstractEnumerable<Object>() {
@Override
public Enumerator<Object> enumerator() {
- return new MemoryEnumerator(tableData.getRows(),
table.getColumns().values(), databaseType);
+ return new MemoryRowEnumerator(tableData.getRows(),
table.getColumns().values(), databaseType);
}
};
}
@@ -285,7 +220,7 @@ public final class EnumerableScanExecutor implements
ScanExecutor {
}
List<Object> result = new ArrayList<>();
for (int each : paramIndexes) {
-
result.add(executorContext.getFederationContext().getQueryContext().getParameters().get(each));
+
result.add(federationContext.getQueryContext().getParameters().get(each));
}
return result;
}
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/JDBCRowEnumerator.java
similarity index 93%
rename from
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java
rename to
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/JDBCRowEnumerator.java
index bda010eb027..b639f6917d7 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/SQLFederationRowEnumerator.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/JDBCRowEnumerator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.sqlfederation.executor.row;
+package org.apache.shardingsphere.sqlfederation.executor.enumerator;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@@ -29,10 +29,10 @@ import java.sql.Statement;
import java.util.Collection;
/**
- * SQL federation row enumerator.
+ * JDBC row enumerator.
*/
@RequiredArgsConstructor
-public final class SQLFederationRowEnumerator implements Enumerator<Object> {
+public final class JDBCRowEnumerator implements Enumerator<Object> {
private final MergedResult queryResult;
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/MemoryRowEnumerator.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/MemoryRowEnumerator.java
new file mode 100644
index 00000000000..b553cfabd55
--- /dev/null
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/enumerator/MemoryRowEnumerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sqlfederation.executor.enumerator;
+
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
+import org.apache.shardingsphere.sqlfederation.executor.utils.EnumeratorUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Memory row enumerator.
+ */
+public final class MemoryRowEnumerator implements Enumerator<Object> {
+
+ private final Collection<ShardingSphereRowData> rows;
+
+ private final Map<Integer, Class<?>> columnTypes;
+
+ private Iterator<ShardingSphereRowData> iterator;
+
+ private Object current;
+
+ public MemoryRowEnumerator(final Collection<ShardingSphereRowData> rows,
final Collection<ShardingSphereColumn> columns, final DatabaseType
databaseType) {
+ this.rows = rows;
+ columnTypes = EnumeratorUtils.createColumnTypes(new
ArrayList<>(columns), databaseType);
+ iterator = rows.iterator();
+ }
+
+ @Override
+ public Object current() {
+ return current;
+ }
+
+ @Override
+ public boolean moveNext() {
+ if (iterator.hasNext()) {
+ current = EnumeratorUtils.convertToTargetType(columnTypes,
iterator.next().getRows().toArray());
+ return true;
+ }
+ current = null;
+ iterator = rows.iterator();
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void close() {
+ iterator = rows.iterator();
+ current = null;
+ }
+}
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/MemoryEnumerator.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/EnumeratorUtils.java
similarity index 52%
rename from
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/MemoryEnumerator.java
rename to
kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/EnumeratorUtils.java
index 889b8bfd566..a646ec2c493 100644
---
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/row/MemoryEnumerator.java
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/EnumeratorUtils.java
@@ -15,57 +15,44 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.sqlfederation.executor.row;
+package org.apache.shardingsphere.sqlfederation.executor.utils;
-import lombok.SneakyThrows;
-import org.apache.calcite.linq4j.Enumerator;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.executor.sql.execute.result.query.impl.driver.jdbc.type.util.ResultSetUtils;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
-import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.util.SQLFederationDataTypeUtils;
import java.sql.SQLFeatureNotSupportedException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
- * Memory enumerator.
+ * Enumerator utilities.
*/
-public final class MemoryEnumerator implements Enumerator<Object> {
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class EnumeratorUtils {
- private final Collection<ShardingSphereRowData> rows;
-
- private final DatabaseType databaseType;
-
- private final Map<Integer, Class<?>> columnTypes;
-
- private Iterator<ShardingSphereRowData> iterator;
-
- private Object current;
-
- public MemoryEnumerator(final Collection<ShardingSphereRowData> rows,
final Collection<ShardingSphereColumn> columns, final DatabaseType
databaseType) {
- this.rows = rows;
- this.databaseType = databaseType;
- columnTypes = createColumnTypes(new ArrayList<>(columns));
- iterator = rows.iterator();
- }
-
- private Map<Integer, Class<?>> createColumnTypes(final
List<ShardingSphereColumn> columns) {
+ /**
+ * Create column types.
+ *
+ * @param columns columns
+ * @param databaseType database type
+ * @return column types
+ */
+ public static Map<Integer, Class<?>> createColumnTypes(final
List<ShardingSphereColumn> columns, final DatabaseType databaseType) {
Map<Integer, Class<?>> result = new HashMap<>(columns.size(), 1F);
for (int index = 0; index < columns.size(); index++) {
int finalIndex = index;
- getSqlTypeClass(columns, index).ifPresent(optional ->
result.put(finalIndex, optional));
+ getSQLTypeClass(columns, databaseType, index).ifPresent(optional
-> result.put(finalIndex, optional));
}
return result;
}
- private Optional<Class<?>> getSqlTypeClass(final
List<ShardingSphereColumn> columns, final int index) {
+ private static Optional<Class<?>> getSQLTypeClass(final
List<ShardingSphereColumn> columns, final DatabaseType databaseType, final int
index) {
try {
return
Optional.of(SQLFederationDataTypeUtils.getSqlTypeClass(databaseType,
columns.get(index)));
} catch (final IllegalArgumentException ex) {
@@ -73,48 +60,28 @@ public final class MemoryEnumerator implements
Enumerator<Object> {
}
}
- @Override
- public Object current() {
- return current;
- }
-
- @Override
- public boolean moveNext() {
- if (iterator.hasNext()) {
- current = convertToTargetType(iterator.next().getRows().toArray());
- return true;
- }
- current = null;
- iterator = rows.iterator();
- return false;
- }
-
- @SneakyThrows
- private Object[] convertToTargetType(final Object[] rows) {
+ /**
+ * Convert to target type.
+ *
+ * @param columnTypes column types
+ * @param rows rows
+ * @return target type
+ */
+ public static Object[] convertToTargetType(final Map<Integer, Class<?>>
columnTypes, final Object[] rows) {
Object[] result = new Object[rows.length];
for (int index = 0; index < rows.length; index++) {
if (columnTypes.containsKey(index)) {
- result[index] = convertValue(rows, index);
+ result[index] = convertValue(rows, columnTypes, index);
}
}
return result;
}
- private Object convertValue(final Object[] rows, final int index) {
+ private static Object convertValue(final Object[] rows, final Map<Integer,
Class<?>> columnTypes, final int index) {
try {
return ResultSetUtils.convertValue(rows[index],
columnTypes.get(index));
} catch (final SQLFeatureNotSupportedException ex) {
return rows[index];
}
}
-
- @Override
- public void reset() {
- }
-
- @Override
- public void close() {
- iterator = rows.iterator();
- current = null;
- }
}
diff --git
a/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/StatisticsAssembleUtils.java
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/StatisticsAssembleUtils.java
new file mode 100644
index 00000000000..731d9abe363
--- /dev/null
+++
b/kernel/sql-federation/executor/src/main/java/org/apache/shardingsphere/sqlfederation/executor/utils/StatisticsAssembleUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sqlfederation.executor.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.authority.rule.AuthorityRule;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
+import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
+import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
+import
org.apache.shardingsphere.sqlfederation.executor.constant.EnumerableConstants;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Statistics assemble utils.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class StatisticsAssembleUtils {
+
+ /**
+ * Assemble table data.
+ *
+ * @param table table
+ * @param metaData meta data
+ * @return ShardingSphere table data
+ */
+ public static ShardingSphereTableData assembleTableData(final
ShardingSphereTable table, final ShardingSphereMetaData metaData) {
+ // TODO move this logic to ShardingSphere statistics
+ ShardingSphereTableData result = new
ShardingSphereTableData(table.getName());
+ if (EnumerableConstants.PG_DATABASE.equalsIgnoreCase(table.getName()))
{
+ assembleOpenGaussDatabaseData(result,
metaData.getDatabases().values());
+ } else if
(EnumerableConstants.PG_TABLES.equalsIgnoreCase(table.getName())) {
+ for (ShardingSphereDatabase each :
metaData.getDatabases().values()) {
+ assembleOpenGaussTableData(result, each.getSchemas());
+ }
+ } else if
(EnumerableConstants.PG_ROLES.equalsIgnoreCase(table.getName())) {
+ assembleOpenGaussRoleData(result, metaData);
+ }
+ return result;
+ }
+
+ private static void assembleOpenGaussDatabaseData(final
ShardingSphereTableData tableData, final Collection<ShardingSphereDatabase>
databases) {
+ for (ShardingSphereDatabase each : databases) {
+ Object[] rows = new Object[15];
+ rows[0] = each.getName();
+ rows[11] = EnumerableConstants.DAT_COMPATIBILITY;
+ tableData.getRows().add(new
ShardingSphereRowData(Arrays.asList(rows)));
+ }
+ }
+
+ private static void assembleOpenGaussTableData(final
ShardingSphereTableData tableData, final Map<String, ShardingSphereSchema>
schemas) {
+ for (Map.Entry<String, ShardingSphereSchema> entry :
schemas.entrySet()) {
+ for (String each : entry.getValue().getAllTableNames()) {
+ Object[] rows = new Object[10];
+ rows[0] = entry.getKey();
+ rows[1] = each;
+ tableData.getRows().add(new
ShardingSphereRowData(Arrays.asList(rows)));
+ }
+ }
+ }
+
+ private static void assembleOpenGaussRoleData(final
ShardingSphereTableData tableData, final ShardingSphereMetaData metaData) {
+ for (ShardingSphereUser each :
metaData.getGlobalRuleMetaData().getSingleRule(AuthorityRule.class).getConfiguration().getUsers())
{
+ Object[] rows = new Object[27];
+ rows[0] = each.getGrantee().getUsername();
+ tableData.getRows().add(new
ShardingSphereRowData(Arrays.asList(rows)));
+ }
+ }
+}
diff --git
a/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java
b/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java
index d258e1a4ece..ee136b448bb 100644
---
a/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java
+++
b/kernel/sql-federation/executor/src/test/java/org/apache/shardingsphere/sqlfederation/executor/enumerable/EnumerableScanExecutorTest.java
@@ -28,7 +28,7 @@ import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereSchemaD
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import
org.apache.shardingsphere.sqlfederation.executor.TableScanExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationExecutorContext;
import
org.apache.shardingsphere.sqlfederation.optimizer.context.OptimizerContext;
import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutorContext;
import org.junit.jupiter.api.Test;
@@ -50,7 +50,7 @@ class EnumerableScanExecutorTest {
void assertExecuteWithStatistics() {
OptimizerContext optimizerContext = mock(OptimizerContext.class,
RETURNS_DEEP_STUBS);
when(optimizerContext.getParserContext(any()).getDatabaseType()).thenReturn(TypedSPILoader.getService(DatabaseType.class,
"PostgreSQL"));
- TableScanExecutorContext executorContext =
mock(TableScanExecutorContext.class);
+ SQLFederationExecutorContext executorContext =
mock(SQLFederationExecutorContext.class);
when(executorContext.getDatabaseName()).thenReturn("db");
when(executorContext.getSchemaName()).thenReturn("pg_catalog");
ShardingSphereStatistics statistics =
mock(ShardingSphereStatistics.class, RETURNS_DEEP_STUBS);
@@ -64,7 +64,7 @@ class EnumerableScanExecutorTest {
ShardingSphereTable table = mock(ShardingSphereTable.class,
RETURNS_DEEP_STUBS);
when(table.getName()).thenReturn("test");
when(table.getColumns().values()).thenReturn(Collections.singleton(new
ShardingSphereColumn("id", Types.INTEGER, true, false, false, false, true,
false)));
- Enumerable<Object> enumerable = new EnumerableScanExecutor(null, null,
null, optimizerContext, null, executorContext, statistics)
+ Enumerable<Object> enumerable = new EnumerableScanExecutor(null, null,
null, optimizerContext, executorContext, null, null, statistics)
.execute(table, mock(ScanExecutorContext.class));
try (Enumerator<Object> actual = enumerable.enumerator()) {
actual.moveNext();
diff --git
a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java
b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java
index b5800266389..a30f1d5b9c7 100644
---
a/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java
+++
b/kernel/sql-federation/optimizer/src/main/java/org/apache/shardingsphere/sqlfederation/optimizer/metadata/schema/SQLFederationTable.java
@@ -35,9 +35,9 @@ import org.apache.calcite.rel.core.TableModify;
import org.apache.calcite.rel.core.TableModify.Operation;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.rel.logical.LogicalTableScan;
-import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
@@ -47,10 +47,10 @@ import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.util.SQLFederationDataTypeUtils;
import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.EmptyRowEnumerator;
import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutor;
import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.schema.table.ScanExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.optimizer.metadata.util.SQLFederationDataTypeUtils;
import
org.apache.shardingsphere.sqlfederation.optimizer.statistic.SQLFederationStatistic;
import java.lang.reflect.Type;
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
index 6a338863e1b..6a1bc1539e9 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/DatabaseConnector.java
@@ -72,7 +72,7 @@ import
org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
-import
org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import org.apache.shardingsphere.transaction.api.TransactionType;
import
org.apache.shardingsphere.transaction.implicit.ImplicitTransactionCallback;
@@ -231,7 +231,7 @@ public final class DatabaseConnector implements
DatabaseBackendHandler {
ProxyJDBCExecutorCallback callback =
ProxyJDBCExecutorCallbackFactory.newInstance(driverType, protocolType,
database.getResourceMetaData(),
queryContext.getSqlStatementContext().getSqlStatement(), this,
isReturnGeneratedKeys, SQLExecutorExceptionHandler.isExceptionThrown(), true);
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaDataContexts);
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(false, queryContext,
metaDataContexts.getMetaData(),
databaseConnectionManager.getConnectionSession().getProcessId());
+ SQLFederationContext context = new SQLFederationContext(false,
queryContext, metaDataContexts.getMetaData(),
databaseConnectionManager.getConnectionSession().getProcessId());
return
proxySQLExecutor.getSqlFederationEngine().executeQuery(prepareEngine, callback,
context);
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
index 60e35d5bd2e..996664ca6e5 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/rul/PreviewExecutor.java
@@ -60,7 +60,7 @@ import
org.apache.shardingsphere.proxy.backend.context.BackendExecutorContext;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import
org.apache.shardingsphere.sql.parser.sql.dialect.statement.mysql.dml.MySQLInsertStatement;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
-import
org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import java.sql.Connection;
import java.sql.SQLException;
@@ -130,7 +130,7 @@ public final class PreviewExecutor implements
DistSQLQueryExecutor<PreviewStatem
// TODO move dialect MySQLInsertStatement into database type module
@zhangliang
boolean isReturnGeneratedKeys = sqlStatement instanceof
MySQLInsertStatement;
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(isReturnGeneratedKeys,
metaData.getProps());
- SQLFederationExecutorContext context = new
SQLFederationExecutorContext(true, queryContext, metaData,
+ SQLFederationContext context = new SQLFederationContext(true,
queryContext, metaData,
((ProxyDatabaseConnectionManager)
connectionContext.getDatabaseConnectionManager()).getConnectionSession().getProcessId());
federationEngine.executeQuery(prepareEngine,
createPreviewCallback(sqlStatement), context);
return context.getPreviewExecutionUnits();
diff --git
a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
index b0b9ab34eb7..be0924e9cd2 100644
---
a/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
+++
b/proxy/backend/type/opengauss/src/main/java/org/apache/shardingsphere/proxy/backend/opengauss/handler/admin/OpenGaussSystemCatalogAdminQueryExecutor.java
@@ -48,7 +48,7 @@ import
org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import
org.apache.shardingsphere.sharding.merge.common.IteratorStreamMergedResult;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sqlfederation.engine.SQLFederationEngine;
-import
org.apache.shardingsphere.sqlfederation.executor.SQLFederationExecutorContext;
+import
org.apache.shardingsphere.sqlfederation.executor.context.SQLFederationContext;
import java.sql.Connection;
import java.sql.ResultSet;
@@ -86,8 +86,8 @@ public final class OpenGaussSystemCatalogAdminQueryExecutor
implements DatabaseA
JDBCExecutor jdbcExecutor = new
JDBCExecutor(BackendExecutorContext.getInstance().getExecutorEngine(),
connectionSession.getConnectionContext());
try (SQLFederationEngine sqlFederationEngine = new
SQLFederationEngine(databaseName, PG_CATALOG, metaDataContexts.getMetaData(),
metaDataContexts.getStatistics(), jdbcExecutor)) {
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection>
prepareEngine = createDriverExecutionPrepareEngine(metaDataContexts,
connectionSession);
- SQLFederationExecutorContext context =
- new SQLFederationExecutorContext(false, new
QueryContext(sqlStatementContext, sql, parameters,
SQLHintUtils.extractHint(sql)), metaDataContexts.getMetaData(),
+ SQLFederationContext context =
+ new SQLFederationContext(false, new
QueryContext(sqlStatementContext, sql, parameters,
SQLHintUtils.extractHint(sql)), metaDataContexts.getMetaData(),
connectionSession.getProcessId());
ShardingSphereDatabase database =
metaDataContexts.getMetaData().getDatabase(databaseName);
ResultSet resultSet =
sqlFederationEngine.executeQuery(prepareEngine,
diff --git
a/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-combine.xml
b/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-combine.xml
index 6a429070b6b..9b311f6f968 100644
---
a/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-combine.xml
+++
b/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-combine.xml
@@ -232,11 +232,13 @@
<assertion parameters="2500:long, 2500:long"
expected-data-source-name="read_dataset" />
</test-case>
- <test-case sql="SELECT o.user_id FROM t_order o WHERE o.order_id > ? MINUS
ALL SELECT u.user_id FROM t_user u ORDER BY user_id" db-types="openGauss"
scenario-types="db">
+ <!-- TODO move this case to db_tbl_sql_federation -->
+ <!--<test-case sql="SELECT o.user_id FROM t_order o WHERE o.order_id > ?
MINUS ALL SELECT u.user_id FROM t_user u ORDER BY user_id" db-types="openGauss"
scenario-types="db">
<assertion parameters="2500:long"
expected-data-source-name="read_dataset" />
- </test-case>
+ </test-case>-->
- <test-case sql="SELECT o.user_id FROM t_order o WHERE o.order_id > ? MINUS
SELECT u.user_id FROM t_user u ORDER BY user_id" db-types="openGauss"
scenario-types="db">
+ <!-- TODO move this case to db_tbl_sql_federation -->
+ <!--<test-case sql="SELECT o.user_id FROM t_order o WHERE o.order_id > ?
MINUS SELECT u.user_id FROM t_user u ORDER BY user_id" db-types="openGauss"
scenario-types="db">
<assertion parameters="2500:long"
expected-data-source-name="read_dataset" />
- </test-case>
+ </test-case>-->
</integration-test-cases>
diff --git
a/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-pagination-group-by-order-by.xml
b/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-pagination-group-by-order-by.xml
index 3dfef92ebdc..8103051e3e9 100644
---
a/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-pagination-group-by-order-by.xml
+++
b/test/e2e/sql/src/test/resources/cases/dql/dql-integration-select-pagination-group-by-order-by.xml
@@ -80,10 +80,11 @@
<assertion parameters="10:int"
expected-data-source-name="read_dataset" />
</test-case>
- <test-case sql="SELECT * FROM t_product p NATURAL JOIN t_product_detail d
WHERE p.product_id > ? ORDER BY p.product_id FETCH NEXT 3 ROW ONLY"
db-types="openGauss" scenario-types="db"
+ <!-- TODO move this test case to db_tbl_sql_federation -->
+ <!--<test-case sql="SELECT * FROM t_product p NATURAL JOIN
t_product_detail d WHERE p.product_id > ? ORDER BY p.product_id FETCH NEXT 3
ROW ONLY" db-types="openGauss" scenario-types="db"
scenario-comments="Test select natural join fetch statement
when use sharding feature and federation executor engine.">
<assertion parameters="10:int"
expected-data-source-name="read_dataset" />
- </test-case>
+ </test-case>-->
<test-case sql="SELECT * FROM t_product p CROSS JOIN t_product_detail d
WHERE p.product_id = ? ORDER BY d.product_id, 7 FETCH NEXT 3 ROW ONLY"
db-types="PostgreSQL,openGauss" scenario-types="db_tbl_sql_federation"
scenario-comments="Test select cross join fetch statement when
use sharding feature and federation executor engine.">