This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch revert-8822-bigquery-rowcount in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7b7c21544b50b1c8ac96f6c88362a0e266b6d55e Author: Anton Kedin <33067037+ake...@users.noreply.github.com> AuthorDate: Mon Jun 17 16:48:30 2019 -0700 Revert "[BEAM-7513] Adding Row Count for Bigquery Table" --- .../apache/beam/sdk/extensions/sql/BeamSqlCli.java | 7 +- .../beam/sdk/extensions/sql/BeamSqlTable.java | 7 - .../beam/sdk/extensions/sql/SqlTransform.java | 2 - .../sql/impl/BeamCalciteSchemaFactory.java | 12 +- .../sdk/extensions/sql/impl/BeamCalciteTable.java | 23 --- .../sql/impl/BeamRowCountStatistics.java | 44 ------ .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 26 +--- .../beam/sdk/extensions/sql/impl/JdbcDriver.java | 34 +---- .../sql/meta/provider/bigquery/BigQueryTable.java | 39 ----- .../sdk/extensions/sql/impl/BeamSqlEnvTest.java | 7 +- .../sdk/extensions/sql/impl/JdbcDriverTest.java | 31 ++-- .../extensions/sql/impl/parser/BeamDDLTest.java | 7 +- ...BeamSqlBuiltinFunctionsIntegrationTestBase.java | 7 +- .../meta/provider/bigquery/BigQueryRowCountIT.java | 161 --------------------- .../meta/provider/bigquery/BigQueryTestTable.java | 45 ------ .../bigquery/BigQueryTestTableProvider.java | 71 --------- .../sql/meta/provider/pubsub/PubsubJsonIT.java | 2 +- .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 23 --- 18 files changed, 29 insertions(+), 519 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java index 8bdb1bf..5e44c6c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; /** {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. */ @Experimental @@ -35,17 +34,15 @@ public class BeamSqlCli { private MetaStore metaStore; public BeamSqlCli metaStore(MetaStore metaStore) { - return metaStore(metaStore, false, PipelineOptionsFactory.create()); + return metaStore(metaStore, false); } - public BeamSqlCli metaStore( - MetaStore metaStore, boolean autoLoadUdfUdaf, PipelineOptions pipelineOptions) { + public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) { this.metaStore = metaStore; BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore); if (autoLoadUdfUdaf) { builder.autoLoadUserDefinedFunctions(); } - builder.setPipelineOptions(pipelineOptions); this.env = builder.build(); return this; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java index 63f7158..14f1b80 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.extensions.sql; -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -38,9 +36,4 @@ public interface BeamSqlTable { /** Get the schema info of the table. */ Schema getSchema(); - - /** Estimates the number of rows or returns null if there is no estimation. */ - default BeamRowCountStatistics getRowCount(PipelineOptions options) { - return BeamRowCountStatistics.UNKNOWN; - } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java index afa4438..e45daca 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java @@ -118,8 +118,6 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>> sqlEnvBuilder.setQueryPlannerClassName( input.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getPlannerName()); - sqlEnvBuilder.setPipelineOptions(input.getPipeline().getOptions()); - BeamSqlEnv sqlEnv = sqlEnvBuilder.build(); return BeamSqlRelUtils.toPCollection(input.getPipeline(), sqlEnv.parseQuery(queryString())); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java index 339810c..f6a016b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java @@ -51,8 +51,8 @@ import org.apache.calcite.schema.Table; * a normal JDBC path, e.g. when CLI connects to {@link JdbcDriver} (without any extra connection * properties). * - * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider, - * org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers. + * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider)} to avoid + * loading all available table providers. */ class BeamCalciteSchemaFactory { @@ -97,10 +97,10 @@ class BeamCalciteSchemaFactory { } /** - * This is the override to create an empty schema, used in {@link JdbcDriver#connect(TableProvider - * , org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all table providers. This - * schema is expected to be replaced by an actual functional schema by the same code that - * specified this override in the first place. + * This is the override to create an empty schema, used in {@link + * JdbcDriver#connect(TableProvider)} to avoid loading all table providers. This schema is + * expected to be replaced by an actual functional schema by the same code that specified this + * override in the first place. */ public static class Empty extends InitialEmptySchema implements SchemaFactory { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java index e53cb04..e800c82 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java @@ -21,11 +21,9 @@ import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; import org.apache.calcite.adapter.java.AbstractQueryableTable; import org.apache.calcite.linq4j.QueryProvider; @@ -40,8 +38,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.ModifiableTable; import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Statistic; -import org.apache.calcite.schema.Statistics; import org.apache.calcite.schema.TranslatableTable; /** Adapter from {@link BeamSqlTable} to a calcite Table. */ @@ -66,25 +62,6 @@ public class BeamCalciteTable extends AbstractQueryableTable } @Override - public Statistic getStatistic() { - /* - Changing class loader is required for the JDBC path. It is similar to what done in - {@link BeamEnumerableConverter#toRowList} and {@link BeamEnumerableConverter#toEnumerable }. - */ - final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); - try { - Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); - BeamRowCountStatistics beamStatistics = - beamTable.getRowCount(BeamEnumerableConverter.createPipelineOptions(pipelineOptions)); - return beamStatistics.isUnknown() - ? Statistics.UNKNOWN - : Statistics.of(beamStatistics.getRowCount().doubleValue(), ImmutableList.of()); - } finally { - Thread.currentThread().setContextClassLoader(originalClassLoader); - } - } - - @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, pipelineOptions); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java deleted file mode 100644 index ac0431d..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.impl; - -import java.io.Serializable; -import java.math.BigInteger; - -/** This class stores row count statistics. */ -public class BeamRowCountStatistics implements Serializable { - public static final BeamRowCountStatistics UNKNOWN = new BeamRowCountStatistics(null); - private final BigInteger rowCount; - - private BeamRowCountStatistics(BigInteger rowCount) { - this.rowCount = rowCount; - } - - public static BeamRowCountStatistics createBoundedTableStatistics(BigInteger rowCount) { - return new BeamRowCountStatistics(rowCount); - } - - /** Is true if the row count cannot be estimated. */ - public boolean isUnknown() { - return rowCount == null; - } - - public BigInteger getRowCount() { - return rowCount; - } -} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index ae4238d..02b3e69 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -38,8 +38,6 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider; import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings; @@ -68,26 +66,14 @@ public class BeamSqlEnv { return new BeamSqlEnvBuilder(tableProvider); } - /** - * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty - * Pipeline Options. It should only be used in tests. - */ public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) { return withTableProvider(new ReadOnlyTableProvider(tableType, tables)); } - /** - * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty - * Pipeline Options. It should only be used in tests. - */ public static BeamSqlEnv withTableProvider(TableProvider tableProvider) { - return builder(tableProvider).setPipelineOptions(PipelineOptionsFactory.create()).build(); + return builder(tableProvider).build(); } - /** - * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty * - * Pipeline Options. It should only be used in tests. - */ public static BeamSqlEnv inMemory(TableProvider... tableProviders) { InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore(); for (TableProvider tableProvider : tableProviders) { @@ -137,7 +123,6 @@ public class BeamSqlEnv { private Set<Map.Entry<String, Function>> functionSet; private boolean autoLoadBuiltinFunctions; private boolean autoLoadUdfs; - private PipelineOptions pipelineOptions; private BeamSqlEnvBuilder(TableProvider tableProvider) { checkNotNull(tableProvider, "Table provider for the default schema must be sets."); @@ -148,7 +133,6 @@ public class BeamSqlEnv { functionSet = new HashSet<>(); autoLoadUdfs = false; autoLoadBuiltinFunctions = false; - pipelineOptions = null; } /** Add a top-level schema backed by the table provider. */ @@ -210,20 +194,14 @@ public class BeamSqlEnv { return this; } - public BeamSqlEnvBuilder setPipelineOptions(PipelineOptions pipelineOptions) { - this.pipelineOptions = pipelineOptions; - return this; - } - /** * Build function to create an instance of BeamSqlEnv based on preset fields. * * @return BeamSqlEnv. */ public BeamSqlEnv build() { - checkNotNull(pipelineOptions); - JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider, pipelineOptions); + JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider); configureSchemas(jdbcConnection); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java index 4dfabb8..bb7cc42 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java @@ -20,14 +20,11 @@ package org.apache.beam.sdk.extensions.sql.impl; import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY; import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.service.AutoService; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; import java.util.function.Consumer; import org.apache.beam.sdk.extensions.sql.SqlTransform; @@ -103,8 +100,6 @@ public class JdbcDriver extends Driver { INSTANCE.register(); } - public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - @Override protected AvaticaFactory createFactory() { return JdbcFactory.wrap((CalciteFactory) super.createFactory()); @@ -146,7 +141,7 @@ public class JdbcDriver extends Driver { * not this path. The CLI ends up using the schema factory that populates the default schema with * all table providers it can find. See {@link BeamCalciteSchemaFactory}. */ - public static JdbcConnection connect(TableProvider tableProvider, PipelineOptions options) { + public static JdbcConnection connect(TableProvider tableProvider) { try { Properties properties = new Properties(); properties.setProperty( @@ -154,36 +149,9 @@ public class JdbcDriver extends Driver { JdbcConnection connection = (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties); connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider); - connection.setPipelineOptionsMap(getOptionsMap(options)); return connection; } catch (SQLException e) { throw new RuntimeException(e); } } - - /** Converts {@link PipelineOptions} to its map format. */ - private static Map<String, String> getOptionsMap(PipelineOptions options) { - Map map = OBJECT_MAPPER.convertValue(options, Map.class); - - map = (Map) map.get("options"); - if (map == null) { - map = new HashMap(); - } - - Map<String, String> optionMap = new HashMap<>(); - for (Object entry : map.entrySet()) { - Map.Entry ent = (Map.Entry) entry; - String value; - try { - value = - (ent.getValue() instanceof String) - ? ent.getValue().toString() - : OBJECT_MAPPER.writeValueAsString(ent.getValue()); - } catch (Exception e) { - throw new IllegalArgumentException("Unable to parse Pipeline Options", e); - } - optionMap.put(ent.getKey().toString(), value); - } - return optionMap; - } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java index 222e4cf..621f149 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java @@ -17,27 +17,19 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; -import java.io.IOException; import java.io.Serializable; -import java.math.BigInteger; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.meta.Table; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * {@code BigQueryTable} represent a BigQuery table as a target. This provider does not currently @@ -47,8 +39,6 @@ import org.slf4j.LoggerFactory; class BigQueryTable extends BaseBeamTable implements Serializable { @VisibleForTesting final String bqLocation; private final ConversionOptions conversionOptions; - private BeamRowCountStatistics rowCountStatistics = null; - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTable.class); BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) { super(table.getSchema()); @@ -57,16 +47,6 @@ class BigQueryTable extends BaseBeamTable implements Serializable { } @Override - public BeamRowCountStatistics getRowCount(PipelineOptions options) { - - if (rowCountStatistics == null) { - rowCountStatistics = getRowCountFromBQ(options, bqLocation); - } - - return rowCountStatistics; - } - - @Override public PCollection.IsBounded isBounded() { return PCollection.IsBounded.BOUNDED; } @@ -92,23 +72,4 @@ class BigQueryTable extends BaseBeamTable implements Serializable { .withFormatFunction(BigQueryUtils.toTableRow()) .to(bqLocation)); } - - private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) { - try { - BigInteger rowCount = - BigQueryHelpers.getNumRows( - o.as(BigQueryOptions.class), BigQueryHelpers.parseTableSpec(bqLocation)); - - if (rowCount == null) { - return BeamRowCountStatistics.UNKNOWN; - } - - return BeamRowCountStatistics.createBoundedTableStatistics(rowCount); - - } catch (IOException | InterruptedException e) { - LOGGER.warn("Could not get the row count for the table " + bqLocation, e); - } - - return BeamRowCountStatistics.UNKNOWN; - } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java index 3b6dda0..517309d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java @@ -24,7 +24,6 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; import java.sql.Connection; import java.sql.ResultSet; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -43,7 +42,6 @@ public class BeamSqlEnvTest { BeamSqlEnv.builder(root) .addSchema("nested", nested) .addSchema("anotherOne", anotherOne) - .setPipelineOptions(PipelineOptionsFactory.create()) .build(); Connection connection = env.connection; @@ -62,9 +60,6 @@ public class BeamSqlEnvTest { exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound"))); TestTableProvider root = new TestTableProvider(); - BeamSqlEnv.builder(root) - .setQueryPlannerClassName("org.test.ClassNotFound") - .setPipelineOptions(PipelineOptionsFactory.create()) - .build(); + BeamSqlEnv.builder(root).setQueryPlannerClassName("org.test.ClassNotFound").build(); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java index 3272d00..6f36173 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java @@ -46,7 +46,6 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.values.Row; @@ -198,7 +197,7 @@ public class JdbcDriverTest { @Test public void testSelectsFromExistingTable() throws Exception { TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); + Connection connection = JdbcDriver.connect(tableProvider); connection .createStatement() @@ -220,7 +219,7 @@ public class JdbcDriverTest { @Test public void testTimestampWithDefaultTimezone() throws Exception { TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); + Connection connection = JdbcDriver.connect(tableProvider); // A table with one TIMESTAMP column Schema schema = Schema.builder().addDateTimeField("ts").build(); @@ -251,7 +250,7 @@ public class JdbcDriverTest { public void testTimestampWithNonzeroTimezone() throws Exception { Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("Asia/Tokyo"), Locale.ROOT); TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); + Connection connection = JdbcDriver.connect(tableProvider); // A table with one TIMESTAMP column Schema schema = Schema.builder().addDateTimeField("ts").build(); @@ -281,7 +280,7 @@ public class JdbcDriverTest { public void testTimestampWithZeroTimezone() throws Exception { Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT); TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); + Connection connection = JdbcDriver.connect(tableProvider); // A table with one TIMESTAMP column Schema schema = Schema.builder().addDateTimeField("ts").build(); @@ -310,7 +309,7 @@ public class JdbcDriverTest { @Test public void testSelectsFromExistingComplexTable() throws Exception { TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); + Connection connection = JdbcDriver.connect(tableProvider); connection .createStatement() @@ -344,7 +343,7 @@ public class JdbcDriverTest { @Test public void testInsertIntoCreatedTable() throws Exception { TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); + Connection connection = JdbcDriver.connect(tableProvider); connection .createStatement() @@ -370,8 +369,7 @@ public class JdbcDriverTest { @Test public void testInternalConnect_boundedTable() throws Exception { - CalciteConnection connection = - JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()); + CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery("SELECT * FROM test"); assertTrue(resultSet.next()); @@ -394,8 +392,7 @@ public class JdbcDriverTest { .addRows(1, "second first") .addRows(2, "second"))); - CalciteConnection connection = - JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); + CalciteConnection connection = JdbcDriver.connect(tableProvider); Statement statement = connection.createStatement(); ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 5"); assertTrue(resultSet1.next()); @@ -435,8 +432,7 @@ public class JdbcDriverTest { .timestampColumnIndex(3) .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE))); - CalciteConnection connection = - JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); + CalciteConnection connection = JdbcDriver.connect(tableProvider); Statement statement = connection.createStatement(); ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 1"); @@ -474,8 +470,7 @@ public class JdbcDriverTest { @Test public void testInternalConnect_setDirectRunner() throws Exception { - CalciteConnection connection = - JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()); + CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE); Statement statement = connection.createStatement(); assertEquals(0, statement.executeUpdate("SET runner = direct")); assertTrue(statement.execute("SELECT * FROM test")); @@ -485,8 +480,7 @@ public class JdbcDriverTest { public void testInternalConnect_setBogusRunner() throws Exception { thrown.expectMessage("Unknown 'runner' specified 'bogus'"); - CalciteConnection connection = - JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()); + CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE); Statement statement = connection.createStatement(); assertEquals(0, statement.executeUpdate("SET runner = bogus")); assertTrue(statement.execute("SELECT * FROM test")); @@ -494,8 +488,7 @@ public class JdbcDriverTest { @Test public void testInternalConnect_resetAll() throws Exception { - CalciteConnection connection = - JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()); + CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE); Statement statement = connection.createStatement(); assertEquals(0, statement.executeUpdate("SET runner = bogus")); assertEquals(0, statement.executeUpdate("RESET ALL")); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java index 5d6f460..b7f4215 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.junit.Test; @@ -168,11 +167,7 @@ public class BeamDDLTest { TestTableProvider rootProvider = new TestTableProvider(); TestTableProvider testProvider = new TestTableProvider(); - BeamSqlEnv env = - BeamSqlEnv.builder(rootProvider) - .addSchema("test", testProvider) - .setPipelineOptions(PipelineOptionsFactory.create()) - .build(); + BeamSqlEnv env = BeamSqlEnv.builder(rootProvider).addSchema("test", testProvider).build(); assertNull(testProvider.getTables().get("person")); env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text"); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java index 638c20f..22430ba 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver; import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.TypeName; @@ -332,7 +331,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { */ public void check(Pipeline pipeline) throws Exception { checkPTransform(pipeline); - checkJdbc(pipeline.getOptions()); + checkJdbc(); } private static final Schema DUMMY_SCHEMA = Schema.builder().addBooleanField("dummy").build(); @@ -354,7 +353,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { Schema.FieldType.STRING, "name") .addRows(1, "first"))); - private void checkJdbc(PipelineOptions pipelineOptions) throws Exception { + private void checkJdbc() throws Exception { // Beam SQL code is only invoked when the calling convention insists on it, so we // have to express this as selecting from a Beam table, even though the contents are // irrelevant. @@ -364,7 +363,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { // // Here we create a Beam table just to force the calling convention. TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider, pipelineOptions); + Connection connection = JdbcDriver.connect(tableProvider); connection .createStatement() .executeUpdate("CREATE EXTERNAL TABLE dummy (dummy BOOLEAN) TYPE 'test'"); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java deleted file mode 100644 index 23d09fc..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; - -import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; -import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; -import static org.apache.beam.sdk.schemas.Schema.toSchema; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import java.math.BigInteger; -import java.util.stream.Stream; -import org.apache.beam.sdk.extensions.sql.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.SqlTransform; -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; -import org.apache.beam.sdk.extensions.sql.meta.Table; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; -import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Integration tests form writing to BigQuery with Beam SQL. */ -@RunWith(JUnit4.class) -public class BigQueryRowCountIT { - private static final Schema SOURCE_SCHEMA = - Schema.builder().addNullableField("id", INT64).addNullableField("name", STRING).build(); - private static final String FAKE_JOB_NAME = "testPipelineOptionInjectionFakeJobName"; - - @Rule public transient TestPipeline pipeline = TestPipeline.create(); - @Rule public transient TestPipeline readingPipeline = TestPipeline.create(); - @Rule public transient TestBigQuery bigQuery = TestBigQuery.create(SOURCE_SCHEMA); - - @Test - public void testEmptyTable() { - BigQueryTableProvider provider = new BigQueryTableProvider(); - Table table = getTable("testTable", bigQuery.tableSpec()); - BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); - assertNotNull(size); - assertEquals(BigInteger.ZERO, size.getRowCount()); - } - - @Test - public void testNonEmptyTable() { - BigQueryTableProvider provider = new BigQueryTableProvider(); - Table table = getTable("testTable", bigQuery.tableSpec()); - - pipeline - .apply( - Create.of( - new TableRow().set("id", 1).set("name", "name1"), - new TableRow().set("id", 2).set("name", "name2"), - new TableRow().set("id", 3).set("name", "name3")) - .withCoder(TableRowJsonCoder.of())) - .apply( - BigQueryIO.writeTableRows() - .to(bigQuery.tableSpec()) - .withSchema( - new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema().setName("id").setType("INTEGER"), - new TableFieldSchema().setName("name").setType("STRING")))) - .withoutValidation()); - pipeline.run().waitUntilFinish(); - - BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - BeamRowCountStatistics size1 = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); - - assertNotNull(size1); - assertEquals(BigInteger.valueOf(3), size1.getRowCount()); - } - - /** This tests if the pipeline options are injected in the path of SQL Transform. */ - @Test - public void testPipelineOptionInjection() { - BigQueryTestTableProvider provider = new BigQueryTestTableProvider(); - Table table = getTable("testTable", bigQuery.tableSpec()); - provider.addTable("testTable", table); - - pipeline - .apply( - Create.of( - new TableRow().set("id", 1).set("name", "name1"), - new TableRow().set("id", 2).set("name", "name2"), - new TableRow().set("id", 3).set("name", "name3")) - .withCoder(TableRowJsonCoder.of())) - .apply( - BigQueryIO.writeTableRows() - .to(bigQuery.tableSpec()) - .withSchema( - new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema().setName("id").setType("INTEGER"), - new TableFieldSchema().setName("name").setType("STRING")))) - .withoutValidation()); - pipeline.run().waitUntilFinish(); - - // changing pipeline options - readingPipeline.getOptions().setJobName(FAKE_JOB_NAME); - - // Reading from the table should update the statistics of bigQuery table - readingPipeline.apply( - SqlTransform.query(" select * from testTable ") - .withDefaultTableProvider("bigquery", provider)); - - readingPipeline.run().waitUntilFinish(); - - BigQueryTestTable sqlTable = (BigQueryTestTable) provider.buildBeamSqlTable(table); - assertEquals(FAKE_JOB_NAME, sqlTable.getJobName()); - } - - @Test - public void testFakeTable() { - BigQueryTableProvider provider = new BigQueryTableProvider(); - Table table = getTable("fakeTable", "project:dataset.table"); - - BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); - BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); - assertTrue(size.isUnknown()); - } - - private static Table getTable(String name, String location) { - return Table.builder() - .name(name) - .comment(name + " table") - .location(location) - .schema( - Stream.of(Schema.Field.nullable("id", INT64), Schema.Field.nullable("name", STRING)) - .collect(toSchema())) - .type("bigquery") - .build(); - } -} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java deleted file mode 100644 index db954ae..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; - -import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; -import org.apache.beam.sdk.extensions.sql.meta.Table; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; -import org.apache.beam.sdk.options.PipelineOptions; - -/** - * A BigQueryTable that keeps jobName from the pipeline options whenever row count is called. It is - * made for {@link BigQueryRowCountIT#testPipelineOptionInjection()} - */ -public class BigQueryTestTable extends BigQueryTable { - private String jobName = null; - - BigQueryTestTable(Table table, BigQueryUtils.ConversionOptions options) { - super(table, options); - } - - @Override - public BeamRowCountStatistics getRowCount(PipelineOptions options) { - jobName = options.getJobName(); - return super.getRowCount(options); - } - - String getJobName() { - return this.jobName; - } -} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java deleted file mode 100644 index d8656ea..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; - -import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull; - -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.sdk.extensions.sql.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.Table; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; - -/** A test table provider for BigQueryRowCountIT. */ -public class BigQueryTestTableProvider extends BigQueryTableProvider { - - private Map<String, Table> tableSpecMap; - private Map<String, BeamSqlTable> beamSqlTableMap; - - BigQueryTestTableProvider() { - super(); - tableSpecMap = new HashMap<>(); - beamSqlTableMap = new HashMap<>(); - } - - void addTable(String name, Table table) { - tableSpecMap.put(name, table); - } - - @Nullable - @Override - public Table getTable(String tableName) { - return tableSpecMap.get(tableName); - } - - @Override - public BeamSqlTable buildBeamSqlTable(Table table) { - BeamSqlTable t = beamSqlTableMap.get(table.getLocation()); - if (t != null) { - return t; - } - - t = - new BigQueryTestTable( - table, - BigQueryUtils.ConversionOptions.builder() - .setTruncateTimestamps( - firstNonNull(table.getProperties().getBoolean("truncateTimestamps"), false) - ? BigQueryUtils.ConversionOptions.TruncateTimestamps.TRUNCATE - : BigQueryUtils.ConversionOptions.TruncateTimestamps.REJECT) - .build()); - beamSqlTableMap.put(table.getLocation(), t); - - return t; - } -} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java index 1e89240..e8fa135 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java @@ -348,7 +348,7 @@ public class PubsubJsonIT implements Serializable { inMemoryMetaStore.registerProvider(tableProvider); } - JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore, options); + JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore); connection.setPipelineOptionsMap(argsMap); return connection; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 6c23708..82bb5ad 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -25,14 +25,12 @@ import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobStatus; -import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto; import com.google.cloud.hadoop.util.ApiErrorExtractor; import java.io.IOException; -import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -543,27 +541,6 @@ public class BigQueryHelpers { } } - /** - * It returns the number of rows for a given table. - * - * @param options - * @param tableRef - * @return The number of rows in the table. - * @throws InterruptedException - * @throws IOException - */ - @Nullable - public static BigInteger getNumRows(BigQueryOptions options, TableReference tableRef) - throws InterruptedException, IOException { - - DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options); - Table table = datasetService.getTable(tableRef); - if (table == null) { - return null; - } - return table.getNumRows(); - } - static String getDatasetLocation( DatasetService datasetService, String projectId, String datasetId) { Dataset dataset;