This is an automated email from the ASF dual-hosted git repository. jhyde pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/calcite.git
commit 5c477f0dee1b9f709016a63ae8e5b824fb560585 Author: Julian Hyde <[email protected]> AuthorDate: Thu May 2 23:40:13 2019 -0700 [CALCITE-3047] In JDBC adapter, expose multiple schemas of the back-end database We do this by adding JdbcCatalogSchema, an implementation of Schema that reads the back-end database catalog and for each schema, creates a sub-schema that is a JdbcSchema. Also add an experimental way to create a Calcite connection with a given schema as its root schema (in this case, a JdbcCatalogSchema) rather than the usual AbstractSchema. Rather than calling JdbcSchema.getDataSource(), generated code now calls schema.unwrap(DataSource.class). This works for JdbcCatalogSchema as well as JdbcSchema. --- .../calcite/adapter/jdbc/JdbcCatalogSchema.java | 147 +++++++++++++++++++++ .../adapter/jdbc/JdbcToEnumerableConverter.java | 14 +- .../org/apache/calcite/jdbc/CalciteSchema.java | 19 ++- .../org/apache/calcite/test/CalciteAssert.java | 4 +- .../calcite/test/MultiJdbcSchemaJoinTest.java | 32 +++++ 5 files changed, 203 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcCatalogSchema.java b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcCatalogSchema.java new file mode 100644 index 0000000..fc9fe3e --- /dev/null +++ b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcCatalogSchema.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.adapter.jdbc; + +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.sql.SqlDialect; +import org.apache.calcite.sql.SqlDialectFactory; +import org.apache.calcite.sql.SqlDialectFactoryImpl; +import org.apache.calcite.util.BuiltInMethod; + +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Objects; +import javax.sql.DataSource; + +/** + * Schema based upon a JDBC catalog (database). + * + * <p>This schema does not directly contain tables, but contains a sub-schema + * for each schema in the catalog in the back-end. Each of those sub-schemas is + * an instance of {@link JdbcSchema}. + * + * <p>This schema is lazy: it does not compute the list of schema names until + * the first call to {@link #getSubSchemaMap()}. Then it creates a + * {@link JdbcSchema} for each schema name. Each JdbcSchema will populate its + * tables on demand. + */ +public class JdbcCatalogSchema extends AbstractSchema { + final DataSource dataSource; + public final SqlDialect dialect; + final JdbcConvention convention; + final String catalog; + + /** Sub-schemas by name, lazily initialized. */ + final Supplier<SubSchemaMap> subSchemaMapSupplier = + Suppliers.memoize(() -> computeSubSchemaMap()); + + /** Creates a JdbcCatalogSchema. */ + public JdbcCatalogSchema(DataSource dataSource, SqlDialect dialect, + JdbcConvention convention, String catalog) { + this.dataSource = Objects.requireNonNull(dataSource); + this.dialect = Objects.requireNonNull(dialect); + this.convention = Objects.requireNonNull(convention); + this.catalog = catalog; + } + + public static JdbcCatalogSchema create( + SchemaPlus parentSchema, + String name, + DataSource dataSource, + String catalog) { + return create(parentSchema, name, dataSource, + SqlDialectFactoryImpl.INSTANCE, catalog); + } + + public static JdbcCatalogSchema create( + SchemaPlus parentSchema, + String name, + DataSource dataSource, + SqlDialectFactory dialectFactory, + String catalog) { + final Expression expression = + parentSchema != null + ? Schemas.subSchemaExpression(parentSchema, name, + JdbcCatalogSchema.class) + : Expressions.call(DataContext.ROOT, + BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method); + final SqlDialect dialect = + JdbcSchema.createDialect(dialectFactory, dataSource); + final JdbcConvention convention = + JdbcConvention.of(dialect, expression, name); + return new JdbcCatalogSchema(dataSource, dialect, convention, catalog); + } + + private SubSchemaMap computeSubSchemaMap() { + final ImmutableMap.Builder<String, Schema> builder = + ImmutableMap.builder(); + String defaultSchemaName; + try (Connection connection = dataSource.getConnection(); + ResultSet resultSet = + connection.getMetaData().getSchemas(catalog, null)) { + defaultSchemaName = connection.getSchema(); + while (resultSet.next()) { + final String schemaName = resultSet.getString(1); + builder.put(schemaName, + new JdbcSchema(dataSource, dialect, convention, catalog, schemaName)); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + return new SubSchemaMap(defaultSchemaName, builder.build()); + } + + @Override protected Map<String, Schema> getSubSchemaMap() { + return subSchemaMapSupplier.get().map; + } + + /** Returns the name of the default sub-schema. */ + public String getDefaultSubSchemaName() { + return subSchemaMapSupplier.get().defaultSchemaName; + } + + /** Returns the data source. */ + public DataSource getDataSource() { + return dataSource; + } + + /** Contains sub-schemas by name, and the name of the default schema. */ + private static class SubSchemaMap { + final String defaultSchemaName; + final ImmutableMap<String, Schema> map; + + private SubSchemaMap(String defaultSchemaName, + ImmutableMap<String, Schema> map) { + this.defaultSchemaName = defaultSchemaName; + this.map = map; + } + } +} + +// End JdbcCatalogSchema.java diff --git a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcToEnumerableConverter.java b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcToEnumerableConverter.java index 7164466..03eb64b 100644 --- a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcToEnumerableConverter.java +++ b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcToEnumerableConverter.java @@ -57,6 +57,7 @@ import java.util.Calendar; import java.util.List; import java.util.TimeZone; import java.util.stream.Collectors; +import javax.sql.DataSource; /** * Relational expression representing a scan of a table in a JDBC data source. @@ -171,22 +172,15 @@ public class JdbcToEnumerableConverter enumerable = builder0.append("enumerable", Expressions.call( BuiltInMethod.RESULT_SET_ENUMERABLE_OF_PREPARED.method, - Expressions.call( - Schemas.unwrap(jdbcConvention.expression, - JdbcSchema.class), - BuiltInMethod.JDBC_SCHEMA_DATA_SOURCE.method), + Schemas.unwrap(jdbcConvention.expression, DataSource.class), sql_, rowBuilderFactory_, preparedStatementConsumer_)); } else { - enumerable = builder0.append( - "enumerable", + enumerable = builder0.append("enumerable", Expressions.call( BuiltInMethod.RESULT_SET_ENUMERABLE_OF.method, - Expressions.call( - Schemas.unwrap(jdbcConvention.expression, - JdbcSchema.class), - BuiltInMethod.JDBC_SCHEMA_DATA_SOURCE.method), + Schemas.unwrap(jdbcConvention.expression, DataSource.class), sql_, rowBuilderFactory_)); } diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java index fa25f69..7445a6b 100644 --- a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java +++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java @@ -16,6 +16,8 @@ */ package org.apache.calcite.jdbc; +import org.apache.calcite.adapter.jdbc.JdbcCatalogSchema; +import org.apache.calcite.adapter.jdbc.JdbcSchema; import org.apache.calcite.linq4j.function.Experimental; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.materialize.Lattice; @@ -47,6 +49,7 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Objects; import java.util.Set; +import javax.sql.DataSource; /** * Schema. @@ -500,8 +503,14 @@ public abstract class CalciteSchema { */ public static CalciteSchema createRootSchema(boolean addMetadataSchema, boolean cache, String name) { + final Schema rootSchema = new CalciteConnectionImpl.RootSchema(); + return createRootSchema(addMetadataSchema, cache, name, rootSchema); + } + + @Experimental + public static CalciteSchema createRootSchema(boolean addMetadataSchema, + boolean cache, String name, Schema schema) { CalciteSchema rootSchema; - final Schema schema = new CalciteConnectionImpl.RootSchema(); if (cache) { rootSchema = new CachingCalciteSchema(null, schema, name); } else { @@ -695,6 +704,14 @@ public abstract class CalciteSchema { if (clazz.isInstance(CalciteSchema.this.schema)) { return clazz.cast(CalciteSchema.this.schema); } + if (clazz == DataSource.class) { + if (schema instanceof JdbcSchema) { + return clazz.cast(((JdbcSchema) schema).getDataSource()); + } + if (schema instanceof JdbcCatalogSchema) { + return clazz.cast(((JdbcCatalogSchema) schema).getDataSource()); + } + } throw new ClassCastException("not a " + clazz); } diff --git a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java index 943b2fd..521d382 100644 --- a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java +++ b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java @@ -1547,7 +1547,7 @@ public class CalciteAssert { public AssertQuery planUpdateHasSql(String expected, int count) { ensurePlan(checkUpdateCount(count)); - expected = "getDataSource(), \"" + expected = ".unwrap(javax.sql.DataSource.class), \"" + expected.replace("\\", "\\\\") .replace("\"", "\\\"") .replaceAll("\n", "\\\\n") @@ -1562,7 +1562,7 @@ public class CalciteAssert { public AssertQuery planHasSql(String expected) { return planContains( - "getDataSource(), \"" + ".unwrap(javax.sql.DataSource.class), \"" + expected.replace("\\", "\\\\") .replace("\"", "\\\"") .replaceAll("\n", "\\\\n") diff --git a/core/src/test/java/org/apache/calcite/test/MultiJdbcSchemaJoinTest.java b/core/src/test/java/org/apache/calcite/test/MultiJdbcSchemaJoinTest.java index 3346c61..d3a3009 100644 --- a/core/src/test/java/org/apache/calcite/test/MultiJdbcSchemaJoinTest.java +++ b/core/src/test/java/org/apache/calcite/test/MultiJdbcSchemaJoinTest.java @@ -17,11 +17,17 @@ package org.apache.calcite.test; import org.apache.calcite.adapter.java.ReflectiveSchema; +import org.apache.calcite.adapter.jdbc.JdbcCatalogSchema; import org.apache.calcite.adapter.jdbc.JdbcSchema; import org.apache.calcite.config.CalciteSystemProperty; import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.jdbc.CalciteJdbc41Factory; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.jdbc.Driver; import org.apache.calcite.schema.SchemaPlus; +import org.apache.commons.dbcp2.BasicDataSource; + import com.google.common.collect.Sets; import org.junit.Test; @@ -33,11 +39,13 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.HashSet; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import javax.sql.DataSource; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -88,6 +96,30 @@ public class MultiJdbcSchemaJoinTest { test(); } + /** Tests {@link org.apache.calcite.adapter.jdbc.JdbcCatalogSchema}. */ + @Test public void test3() throws SQLException { + final BasicDataSource dataSource = new BasicDataSource(); + dataSource.setUrl(TempDb.INSTANCE.getUrl()); + dataSource.setUsername(""); + dataSource.setPassword(""); + final JdbcCatalogSchema schema = + JdbcCatalogSchema.create(null, "", dataSource, "PUBLIC"); + assertThat(schema.getSubSchemaNames(), + is(Sets.newHashSet("INFORMATION_SCHEMA", "PUBLIC", "SYSTEM_LOBS"))); + final CalciteSchema rootSchema0 = + CalciteSchema.createRootSchema(false, false, "", schema); + final Driver driver = new Driver(); + final CalciteJdbc41Factory factory = new CalciteJdbc41Factory(); + final String sql = "select count(*) as c from information_schema.schemata"; + try (Connection connection = + factory.newConnection(driver, factory, + "jdbc:calcite:", new Properties(), rootSchema0, null); + Statement stmt3 = connection.createStatement(); + ResultSet rs = stmt3.executeQuery(sql)) { + assertThat(CalciteAssert.toString(rs), equalTo("C=3\n")); + } + } + private Connection setup() throws SQLException { // Create a jdbc database & table final String db = TempDb.INSTANCE.getUrl();
