diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index 5fd08806c525..f5613c9b56bf 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -19,12 +19,15 @@
 
 import com.google.auto.value.AutoValue;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -86,9 +89,17 @@
 
   abstract boolean autoUdfUdafLoad();
 
+  abstract Map<String, TableProvider> tableProviderMap();
+
+  abstract @Nullable String defaultTableProvider();
+
   @Override
   public PCollection<Row> expand(PInput input) {
     BeamSqlEnv sqlEnv = BeamSqlEnv.readOnly(PCOLLECTION_NAME, 
toTableMap(input));
+    tableProviderMap().forEach(sqlEnv::addSchema);
+    if (defaultTableProvider() != null) {
+      sqlEnv.setCurrentSchema(defaultTableProvider());
+    }
 
     // TODO: validate duplicate functions.
     sqlEnv.loadBeamBuiltinFunctions();
@@ -154,10 +165,21 @@ public static SqlTransform query(String queryString) {
         .setQueryString(queryString)
         .setUdafDefinitions(Collections.emptyList())
         .setUdfDefinitions(Collections.emptyList())
+        .setTableProviderMap(Collections.emptyMap())
         .setAutoUdfUdafLoad(false)
         .build();
   }
 
+  public SqlTransform withTableProvider(String name, TableProvider 
tableProvider) {
+    Map<String, TableProvider> map = new HashMap<>(tableProviderMap());
+    map.put(name, tableProvider);
+    return toBuilder().setTableProviderMap(ImmutableMap.copyOf(map)).build();
+  }
+
+  public SqlTransform withDefaultTableProvider(String name, TableProvider 
tableProvider) {
+    return withTableProvider(name, 
tableProvider).toBuilder().setDefaultTableProvider(name).build();
+  }
+
   public SqlTransform withAutoUdfUdafLoad(boolean autoUdfUdafLoad) {
     return toBuilder().setAutoUdfUdafLoad(autoUdfUdafLoad).build();
   }
@@ -215,6 +237,10 @@ static Builder builder() {
 
     abstract Builder setAutoUdfUdafLoad(boolean autoUdfUdafLoad);
 
+    abstract Builder setTableProviderMap(Map<String, TableProvider> 
tableProviderMap);
+
+    abstract Builder setDefaultTableProvider(@Nullable String 
defaultTableProvider);
+
     abstract SqlTransform build();
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
index 65b2f1556555..83993600f941 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
@@ -54,9 +54,13 @@
 class BeamQueryPlanner {
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamQueryPlanner.class);
 
-  private final FrameworkConfig config;
+  private JdbcConnection connection;
 
   BeamQueryPlanner(JdbcConnection connection) {
+    this.connection = connection;
+  }
+
+  public FrameworkConfig config() {
     final CalciteConnectionConfig config = connection.config();
     final SqlParser.ConfigBuilder parserConfig =
         SqlParser.configBuilder()
@@ -85,17 +89,16 @@
     final SqlOperatorTable opTab0 =
         connection.config().fun(SqlOperatorTable.class, 
SqlStdOperatorTable.instance());
 
-    this.config =
-        Frameworks.newConfigBuilder()
-            .parserConfig(parserConfig.build())
-            .defaultSchema(defaultSchema)
-            .traitDefs(traitDefs)
-            .context(Contexts.of(connection.config()))
-            .ruleSets(BeamRuleSets.getRuleSets())
-            .costFactory(null)
-            .typeSystem(connection.getTypeFactory().getTypeSystem())
-            .operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
-            .build();
+    return Frameworks.newConfigBuilder()
+        .parserConfig(parserConfig.build())
+        .defaultSchema(defaultSchema)
+        .traitDefs(traitDefs)
+        .context(Contexts.of(connection.config()))
+        .ruleSets(BeamRuleSets.getRuleSets())
+        .costFactory(null)
+        .typeSystem(connection.getTypeFactory().getTypeSystem())
+        .operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
+        .build();
   }
 
   /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */
@@ -141,6 +144,6 @@ public BeamRelNode convertToBeamRel(String sqlStatement)
   }
 
   private Planner getPlanner() {
-    return Frameworks.getPlanner(config);
+    return Frameworks.getPlanner(config());
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 5841efb64f8c..e1589bdea81e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl;
 
 import java.lang.reflect.Method;
+import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
@@ -80,6 +81,18 @@ private void registerBuiltinUdf(Map<String, List<Method>> 
methods) {
     }
   }
 
+  public void addSchema(String name, TableProvider tableProvider) {
+    connection.setSchema(name, tableProvider);
+  }
+
+  public void setCurrentSchema(String name) {
+    try {
+      connection.setSchema(name);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /** Register a UDF function which can be used in SQL expression. */
   public void registerUdf(String functionName, Class<?> clazz, String method) {
     connection.getCurrentSchemaPlus().add(functionName, UdfImpl.create(clazz, 
method));
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
index a4a7f479c10b..f3327d857347 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
@@ -64,6 +64,7 @@ private JdbcConnection(CalciteConnection connection) throws 
SQLException {
 
     JdbcConnection jdbcConnection = new JdbcConnection(connection);
     jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection));
+    jdbcConnection.getRootSchema().setCacheEnabled(false);
     jdbcConnection.setSchema(
         connection.getSchema(), 
BeamCalciteSchemaFactory.fromInitialEmptySchema(jdbcConnection));
     return jdbcConnection;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMultipleSchemasTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMultipleSchemasTest.java
new file mode 100644
index 000000000000..5d9e5bf2254c
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlMultipleSchemasTest.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Ad-hoc tests for CAST. */
+public class BeamSqlMultipleSchemasTest {
+
+  private static final Schema ROW_SCHEMA =
+      
Schema.builder().addInt32Field("f_int").addStringField("f_string").build();
+
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException exceptions = ExpectedException.none();
+
+  @Test
+  public void testSelectFromQualifiedPCollection() {
+    PCollection<Row> input = pipeline.apply(create(row(1, "strstr")));
+
+    PCollection<Row> result =
+        input.apply(SqlTransform.query("SELECT f_int, f_string FROM 
beam.PCOLLECTION"));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "strstr"));
+    pipeline.run();
+  }
+
+  @Test
+  public void testSelectFromUnqualifiedPCollection() {
+    PCollection<Row> input = pipeline.apply(create(row(1, "strstr")));
+
+    PCollection<Row> result =
+        input.apply(SqlTransform.query("SELECT f_int, f_string FROM 
PCOLLECTION"));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "strstr"));
+    pipeline.run();
+  }
+
+  @Test
+  public void testSelectFromExtraSchema() {
+    PCollection<Row> inputMain =
+        pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, 
"pcollection_2")));
+
+    PCollection<Row> inputExtra =
+        pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, 
"_extra_table_2")));
+
+    TableProvider extraInputProvider = extraTableProvider("extraTable", 
inputExtra);
+
+    PCollection<Row> result =
+        inputMain.apply(
+            SqlTransform.query("SELECT f_int, f_string FROM 
extraSchema.extraTable")
+                .withTableProvider("extraSchema", extraInputProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, 
"_extra_table_2"));
+    pipeline.run();
+  }
+
+  @Test
+  public void testOverrideUnqualifiedMainSchema() {
+    PCollection<Row> inputMain =
+        pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, 
"pcollection_2")));
+
+    PCollection<Row> inputExtra =
+        pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, 
"_extra_table_2")));
+
+    TableProvider extraInputProvider = extraTableProvider("extraTable", 
inputExtra);
+
+    PCollection<Row> result =
+        inputMain.apply(
+            SqlTransform.query("SELECT f_int, f_string FROM extraTable")
+                .withTableProvider("beam", extraInputProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, 
"_extra_table_2"));
+    pipeline.run();
+  }
+
+  @Test
+  public void testOverrideQualifiedMainSchema() {
+    PCollection<Row> inputMain =
+        pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, 
"pcollection_2")));
+
+    PCollection<Row> inputExtra =
+        pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, 
"_extra_table_2")));
+
+    TableProvider extraInputProvider = extraTableProvider("extraTable", 
inputExtra);
+
+    PCollection<Row> result =
+        inputMain.apply(
+            SqlTransform.query("SELECT f_int, f_string FROM beam.extraTable")
+                .withTableProvider("beam", extraInputProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, 
"_extra_table_2"));
+    pipeline.run();
+  }
+
+  @Test
+  public void testSetDefaultUnqualifiedSchema() {
+    PCollection<Row> inputMain =
+        pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, 
"pcollection_2")));
+
+    PCollection<Row> inputExtra =
+        pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, 
"_extra_table_2")));
+
+    TableProvider extraInputProvider = extraTableProvider("extraTable", 
inputExtra);
+
+    PCollection<Row> result =
+        inputMain.apply(
+            SqlTransform.query("SELECT f_int, f_string FROM extraTable")
+                .withDefaultTableProvider("extraSchema", extraInputProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, 
"_extra_table_2"));
+    pipeline.run();
+  }
+
+  @Test
+  public void testSetDefaultUnqualifiedSchemaAndJoin() {
+    PCollection<Row> inputMain =
+        pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, 
"pcollection_2")));
+
+    PCollection<Row> inputExtra =
+        pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, 
"_extra_table_2")));
+
+    TableProvider extraInputProvider = extraTableProvider("extraTable", 
inputExtra);
+
+    PCollection<Row> result =
+        inputMain.apply(
+            SqlTransform.query(
+                    "SELECT extra.f_int, main.f_string || extra.f_string AS 
f_string \n"
+                        + "FROM extraTable AS extra \n"
+                        + "   INNER JOIN \n"
+                        + " beam.PCOLLECTION AS main \n"
+                        + "   ON main.f_int = extra.f_int")
+                .withDefaultTableProvider("extraSchema", extraInputProvider));
+
+    PAssert.that(result)
+        .containsInAnyOrder(
+            row(1, "pcollection_1_extra_table_1"), row(2, 
"pcollection_2_extra_table_2"));
+    pipeline.run();
+  }
+
+  @Test
+  public void testSetDefaultQualifiedSchema() {
+    PCollection<Row> inputMain =
+        pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, 
"pcollection_2")));
+
+    PCollection<Row> inputExtra =
+        pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, 
"_extra_table_2")));
+
+    TableProvider extraInputProvider = extraTableProvider("extraTable", 
inputExtra);
+
+    PCollection<Row> result =
+        inputMain.apply(
+            SqlTransform.query("SELECT f_int, f_string FROM 
extraSchema.extraTable")
+                .withDefaultTableProvider("extraSchema", extraInputProvider));
+
+    PAssert.that(result).containsInAnyOrder(row(1, "_extra_table_1"), row(2, 
"_extra_table_2"));
+    pipeline.run();
+  }
+
+  @Test
+  public void testJoinWithExtraSchema() {
+    PCollection<Row> inputMain =
+        pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, 
"pcollection_2")));
+
+    PCollection<Row> inputExtra =
+        pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, 
"_extra_table_2")));
+
+    TableProvider extraInputProvider = extraTableProvider("extraTable", 
inputExtra);
+
+    PCollection<Row> result =
+        inputMain.apply(
+            SqlTransform.query(
+                    "SELECT extra.f_int, main.f_string || extra.f_string AS 
f_string \n"
+                        + "FROM extraSchema.extraTable AS extra \n"
+                        + "   INNER JOIN \n"
+                        + " PCOLLECTION AS main \n"
+                        + "   ON main.f_int = extra.f_int")
+                .withTableProvider("extraSchema", extraInputProvider));
+
+    PAssert.that(result)
+        .containsInAnyOrder(
+            row(1, "pcollection_1_extra_table_1"), row(2, 
"pcollection_2_extra_table_2"));
+    pipeline.run();
+  }
+
+  @Test
+  public void testJoinQualifiedMainWithExtraSchema() {
+    PCollection<Row> inputMain =
+        pipeline.apply("mainInput", create(row(1, "pcollection_1"), row(2, 
"pcollection_2")));
+
+    PCollection<Row> inputExtra =
+        pipeline.apply("extraInput", create(row(1, "_extra_table_1"), row(2, 
"_extra_table_2")));
+
+    TableProvider extraInputProvider = extraTableProvider("extraTable", 
inputExtra);
+
+    PCollection<Row> result =
+        inputMain.apply(
+            SqlTransform.query(
+                    "SELECT extra.f_int, main.f_string || extra.f_string AS 
f_string \n"
+                        + "FROM extraSchema.extraTable AS extra \n"
+                        + "   INNER JOIN \n"
+                        + " beam.PCOLLECTION AS main \n"
+                        + "   ON main.f_int = extra.f_int")
+                .withTableProvider("extraSchema", extraInputProvider));
+
+    PAssert.that(result)
+        .containsInAnyOrder(
+            row(1, "pcollection_1_extra_table_1"), row(2, 
"pcollection_2_extra_table_2"));
+    pipeline.run();
+  }
+
+  private TableProvider extraTableProvider(String tableName, PCollection<Row> 
rows) {
+    return new ReadOnlyTableProvider(
+        "testExtraTableProvider", ImmutableMap.of(tableName, new 
BeamPCollectionTable<>(rows)));
+  }
+
+  private Row row(int fIntValue, String fStringValue) {
+    return Row.withSchema(ROW_SCHEMA).addValues(fIntValue, 
fStringValue).build();
+  }
+
+  private PTransform<PBegin, PCollection<Row>> create(Row... rows) {
+    return Create.of(Arrays.asList(rows)).withRowSchema(ROW_SCHEMA);
+  }
+}


With regards,
Apache Git Services

Reply via email to