This is an automated email from the ASF dual-hosted git repository.

jhyde pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git

commit 5c477f0dee1b9f709016a63ae8e5b824fb560585
Author: Julian Hyde <[email protected]>
AuthorDate: Thu May 2 23:40:13 2019 -0700

    [CALCITE-3047] In JDBC adapter, expose multiple schemas of the back-end 
database
    
    We do this by adding JdbcCatalogSchema, an implementation of Schema
    that reads the back-end database catalog and for each schema, creates
    a sub-schema that is a JdbcSchema.
    
    Also add an experimental way to create a Calcite connection with a
    given schema as its root schema (in this case, a JdbcCatalogSchema)
    rather than the usual AbstractSchema.
    
    Rather than calling JdbcSchema.getDataSource(), generated code now
    calls schema.unwrap(DataSource.class). This works for JdbcCatalogSchema
    as well as JdbcSchema.
---
 .../calcite/adapter/jdbc/JdbcCatalogSchema.java    | 147 +++++++++++++++++++++
 .../adapter/jdbc/JdbcToEnumerableConverter.java    |  14 +-
 .../org/apache/calcite/jdbc/CalciteSchema.java     |  19 ++-
 .../org/apache/calcite/test/CalciteAssert.java     |   4 +-
 .../calcite/test/MultiJdbcSchemaJoinTest.java      |  32 +++++
 5 files changed, 203 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcCatalogSchema.java 
b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcCatalogSchema.java
new file mode 100644
index 0000000..fc9fe3e
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcCatalogSchema.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.jdbc;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Schemas;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialectFactory;
+import org.apache.calcite.sql.SqlDialectFactoryImpl;
+import org.apache.calcite.util.BuiltInMethod;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Objects;
+import javax.sql.DataSource;
+
+/**
+ * Schema based upon a JDBC catalog (database).
+ *
+ * <p>This schema does not directly contain tables, but contains a sub-schema
+ * for each schema in the catalog in the back-end. Each of those sub-schemas is
+ * an instance of {@link JdbcSchema}.
+ *
+ * <p>This schema is lazy: it does not compute the list of schema names until
+ * the first call to {@link #getSubSchemaMap()}. Then it creates a
+ * {@link JdbcSchema} for each schema name. Each JdbcSchema will populate its
+ * tables on demand.
+ */
+public class JdbcCatalogSchema extends AbstractSchema {
+  final DataSource dataSource;
+  public final SqlDialect dialect;
+  final JdbcConvention convention;
+  final String catalog;
+
+  /** Sub-schemas by name, lazily initialized. */
+  final Supplier<SubSchemaMap> subSchemaMapSupplier =
+      Suppliers.memoize(() -> computeSubSchemaMap());
+
+  /** Creates a JdbcCatalogSchema. */
+  public JdbcCatalogSchema(DataSource dataSource, SqlDialect dialect,
+      JdbcConvention convention, String catalog) {
+    this.dataSource = Objects.requireNonNull(dataSource);
+    this.dialect = Objects.requireNonNull(dialect);
+    this.convention = Objects.requireNonNull(convention);
+    this.catalog = catalog;
+  }
+
+  public static JdbcCatalogSchema create(
+      SchemaPlus parentSchema,
+      String name,
+      DataSource dataSource,
+      String catalog) {
+    return create(parentSchema, name, dataSource,
+        SqlDialectFactoryImpl.INSTANCE, catalog);
+  }
+
+  public static JdbcCatalogSchema create(
+      SchemaPlus parentSchema,
+      String name,
+      DataSource dataSource,
+      SqlDialectFactory dialectFactory,
+      String catalog) {
+    final Expression expression =
+        parentSchema != null
+            ? Schemas.subSchemaExpression(parentSchema, name,
+                JdbcCatalogSchema.class)
+            : Expressions.call(DataContext.ROOT,
+                BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method);
+    final SqlDialect dialect =
+        JdbcSchema.createDialect(dialectFactory, dataSource);
+    final JdbcConvention convention =
+        JdbcConvention.of(dialect, expression, name);
+    return new JdbcCatalogSchema(dataSource, dialect, convention, catalog);
+  }
+
+  private SubSchemaMap computeSubSchemaMap() {
+    final ImmutableMap.Builder<String, Schema> builder =
+        ImmutableMap.builder();
+    String defaultSchemaName;
+    try (Connection connection = dataSource.getConnection();
+         ResultSet resultSet =
+             connection.getMetaData().getSchemas(catalog, null)) {
+      defaultSchemaName = connection.getSchema();
+      while (resultSet.next()) {
+        final String schemaName = resultSet.getString(1);
+        builder.put(schemaName,
+            new JdbcSchema(dataSource, dialect, convention, catalog, 
schemaName));
+      }
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+    return new SubSchemaMap(defaultSchemaName, builder.build());
+  }
+
+  @Override protected Map<String, Schema> getSubSchemaMap() {
+    return subSchemaMapSupplier.get().map;
+  }
+
+  /** Returns the name of the default sub-schema. */
+  public String getDefaultSubSchemaName() {
+    return subSchemaMapSupplier.get().defaultSchemaName;
+  }
+
+  /** Returns the data source. */
+  public DataSource getDataSource() {
+    return dataSource;
+  }
+
+  /** Contains sub-schemas by name, and the name of the default schema. */
+  private static class SubSchemaMap {
+    final String defaultSchemaName;
+    final ImmutableMap<String, Schema> map;
+
+    private SubSchemaMap(String defaultSchemaName,
+        ImmutableMap<String, Schema> map) {
+      this.defaultSchemaName = defaultSchemaName;
+      this.map = map;
+    }
+  }
+}
+
+// End JdbcCatalogSchema.java
diff --git 
a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcToEnumerableConverter.java
 
b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcToEnumerableConverter.java
index 7164466..03eb64b 100644
--- 
a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcToEnumerableConverter.java
+++ 
b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcToEnumerableConverter.java
@@ -57,6 +57,7 @@ import java.util.Calendar;
 import java.util.List;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
+import javax.sql.DataSource;
 
 /**
  * Relational expression representing a scan of a table in a JDBC data source.
@@ -171,22 +172,15 @@ public class JdbcToEnumerableConverter
       enumerable = builder0.append("enumerable",
           Expressions.call(
               BuiltInMethod.RESULT_SET_ENUMERABLE_OF_PREPARED.method,
-              Expressions.call(
-                  Schemas.unwrap(jdbcConvention.expression,
-                      JdbcSchema.class),
-                  BuiltInMethod.JDBC_SCHEMA_DATA_SOURCE.method),
+              Schemas.unwrap(jdbcConvention.expression, DataSource.class),
               sql_,
               rowBuilderFactory_,
               preparedStatementConsumer_));
     } else {
-      enumerable = builder0.append(
-          "enumerable",
+      enumerable = builder0.append("enumerable",
           Expressions.call(
               BuiltInMethod.RESULT_SET_ENUMERABLE_OF.method,
-              Expressions.call(
-                  Schemas.unwrap(jdbcConvention.expression,
-                      JdbcSchema.class),
-                  BuiltInMethod.JDBC_SCHEMA_DATA_SOURCE.method),
+              Schemas.unwrap(jdbcConvention.expression, DataSource.class),
               sql_,
               rowBuilderFactory_));
     }
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java 
b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
index fa25f69..7445a6b 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteSchema.java
@@ -16,6 +16,8 @@
  */
 package org.apache.calcite.jdbc;
 
+import org.apache.calcite.adapter.jdbc.JdbcCatalogSchema;
+import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.linq4j.function.Experimental;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.materialize.Lattice;
@@ -47,6 +49,7 @@ import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Objects;
 import java.util.Set;
+import javax.sql.DataSource;
 
 /**
  * Schema.
@@ -500,8 +503,14 @@ public abstract class CalciteSchema {
    */
   public static CalciteSchema createRootSchema(boolean addMetadataSchema,
       boolean cache, String name) {
+    final Schema rootSchema = new CalciteConnectionImpl.RootSchema();
+    return createRootSchema(addMetadataSchema, cache, name, rootSchema);
+  }
+
+  @Experimental
+  public static CalciteSchema createRootSchema(boolean addMetadataSchema,
+      boolean cache, String name, Schema schema) {
     CalciteSchema rootSchema;
-    final Schema schema = new CalciteConnectionImpl.RootSchema();
     if (cache) {
       rootSchema = new CachingCalciteSchema(null, schema, name);
     } else {
@@ -695,6 +704,14 @@ public abstract class CalciteSchema {
       if (clazz.isInstance(CalciteSchema.this.schema)) {
         return clazz.cast(CalciteSchema.this.schema);
       }
+      if (clazz == DataSource.class) {
+        if (schema instanceof JdbcSchema) {
+          return clazz.cast(((JdbcSchema) schema).getDataSource());
+        }
+        if (schema instanceof JdbcCatalogSchema) {
+          return clazz.cast(((JdbcCatalogSchema) schema).getDataSource());
+        }
+      }
       throw new ClassCastException("not a " + clazz);
     }
 
diff --git a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java 
b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
index 943b2fd..521d382 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteAssert.java
@@ -1547,7 +1547,7 @@ public class CalciteAssert {
 
     public AssertQuery planUpdateHasSql(String expected, int count) {
       ensurePlan(checkUpdateCount(count));
-      expected = "getDataSource(), \""
+      expected = ".unwrap(javax.sql.DataSource.class), \""
           + expected.replace("\\", "\\\\")
           .replace("\"", "\\\"")
           .replaceAll("\n", "\\\\n")
@@ -1562,7 +1562,7 @@ public class CalciteAssert {
 
     public AssertQuery planHasSql(String expected) {
       return planContains(
-          "getDataSource(), \""
+          ".unwrap(javax.sql.DataSource.class), \""
               + expected.replace("\\", "\\\\")
               .replace("\"", "\\\"")
               .replaceAll("\n", "\\\\n")
diff --git 
a/core/src/test/java/org/apache/calcite/test/MultiJdbcSchemaJoinTest.java 
b/core/src/test/java/org/apache/calcite/test/MultiJdbcSchemaJoinTest.java
index 3346c61..d3a3009 100644
--- a/core/src/test/java/org/apache/calcite/test/MultiJdbcSchemaJoinTest.java
+++ b/core/src/test/java/org/apache/calcite/test/MultiJdbcSchemaJoinTest.java
@@ -17,11 +17,17 @@
 package org.apache.calcite.test;
 
 import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.adapter.jdbc.JdbcCatalogSchema;
 import org.apache.calcite.adapter.jdbc.JdbcSchema;
 import org.apache.calcite.config.CalciteSystemProperty;
 import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteJdbc41Factory;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.jdbc.Driver;
 import org.apache.calcite.schema.SchemaPlus;
 
+import org.apache.commons.dbcp2.BasicDataSource;
+
 import com.google.common.collect.Sets;
 
 import org.junit.Test;
@@ -33,11 +39,13 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.sql.DataSource;
 
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -88,6 +96,30 @@ public class MultiJdbcSchemaJoinTest {
     test();
   }
 
+  /** Tests {@link org.apache.calcite.adapter.jdbc.JdbcCatalogSchema}. */
+  @Test public void test3() throws SQLException {
+    final BasicDataSource dataSource = new BasicDataSource();
+    dataSource.setUrl(TempDb.INSTANCE.getUrl());
+    dataSource.setUsername("");
+    dataSource.setPassword("");
+    final JdbcCatalogSchema schema =
+        JdbcCatalogSchema.create(null, "", dataSource, "PUBLIC");
+    assertThat(schema.getSubSchemaNames(),
+        is(Sets.newHashSet("INFORMATION_SCHEMA", "PUBLIC", "SYSTEM_LOBS")));
+    final CalciteSchema rootSchema0 =
+        CalciteSchema.createRootSchema(false, false, "", schema);
+    final Driver driver = new Driver();
+    final CalciteJdbc41Factory factory = new CalciteJdbc41Factory();
+    final String sql = "select count(*) as c from information_schema.schemata";
+    try (Connection connection =
+             factory.newConnection(driver, factory,
+                 "jdbc:calcite:", new Properties(), rootSchema0, null);
+         Statement stmt3 = connection.createStatement();
+         ResultSet rs = stmt3.executeQuery(sql)) {
+      assertThat(CalciteAssert.toString(rs), equalTo("C=3\n"));
+    }
+  }
+
   private Connection setup() throws SQLException {
     // Create a jdbc database & table
     final String db = TempDb.INSTANCE.getUrl();

Reply via email to