This is an automated email from the ASF dual-hosted git repository. anton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e868b80 [BEAM-7513] Adding RowCount to BigQueryTable. new b2fd4e3 Merge pull request #8892 from riazela/bigquery-rowcount e868b80 is described below commit e868b8043287cece938a1864066f33ec33f813e8 Author: Alireza Samadian <alireza4...@gmail.com> AuthorDate: Wed Jun 19 09:36:19 2019 -0700 [BEAM-7513] Adding RowCount to BigQueryTable. --- .../apache/beam/sdk/extensions/sql/BeamSqlCli.java | 7 +- .../beam/sdk/extensions/sql/BeamSqlTable.java | 7 + .../beam/sdk/extensions/sql/SqlTransform.java | 2 + .../sdk/extensions/sql/impl/BeamCalciteSchema.java | 5 +- .../sql/impl/BeamCalciteSchemaFactory.java | 12 +- .../sdk/extensions/sql/impl/BeamCalciteTable.java | 49 ++++++- .../BeamRowCountStatistics.java} | 37 +++-- .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 26 +++- .../sdk/extensions/sql/impl/JdbcConnection.java | 9 ++ .../beam/sdk/extensions/sql/impl/JdbcDriver.java | 6 +- .../sql/meta/provider/bigquery/BigQueryTable.java | 39 +++++ .../sdk/extensions/sql/impl/BeamSqlEnvTest.java | 7 +- .../sdk/extensions/sql/impl/JdbcDriverTest.java | 31 ++-- .../extensions/sql/impl/parser/BeamDDLTest.java | 7 +- ...BeamSqlBuiltinFunctionsIntegrationTestBase.java | 7 +- .../meta/provider/bigquery/BigQueryRowCountIT.java | 161 +++++++++++++++++++++ .../meta/provider/bigquery/BigQueryTestTable.java | 45 ++++++ .../bigquery/BigQueryTestTableProvider.java | 71 +++++++++ .../sql/meta/provider/pubsub/PubsubJsonIT.java | 2 +- .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 19 +++ 20 files changed, 498 insertions(+), 51 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java index 5e44c6c..8bdb1bf 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils; import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; /** {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. */ @Experimental @@ -34,15 +35,17 @@ public class BeamSqlCli { private MetaStore metaStore; public BeamSqlCli metaStore(MetaStore metaStore) { - return metaStore(metaStore, false); + return metaStore(metaStore, false, PipelineOptionsFactory.create()); } - public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) { + public BeamSqlCli metaStore( + MetaStore metaStore, boolean autoLoadUdfUdaf, PipelineOptions pipelineOptions) { this.metaStore = metaStore; BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore); if (autoLoadUdfUdaf) { builder.autoLoadUserDefinedFunctions(); } + builder.setPipelineOptions(pipelineOptions); this.env = builder.build(); return this; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java index 14f1b80..63f7158 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.sql; +import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -36,4 +38,9 @@ public interface BeamSqlTable { /** Get the schema info of the table. */ Schema getSchema(); + + /** Estimates the number of rows or returns null if there is no estimation. */ + default BeamRowCountStatistics getRowCount(PipelineOptions options) { + return BeamRowCountStatistics.UNKNOWN; + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java index e45daca..afa4438 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java @@ -118,6 +118,8 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>> sqlEnvBuilder.setQueryPlannerClassName( input.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getPlannerName()); + sqlEnvBuilder.setPipelineOptions(input.getPipeline().getOptions()); + BeamSqlEnv sqlEnv = sqlEnvBuilder.build(); return BeamSqlRelUtils.toPCollection(input.getPipeline(), sqlEnv.parseQuery(queryString())); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java index 1da4aae..ae84d36 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java @@ -104,7 +104,10 @@ public class BeamCalciteSchema implements Schema { if (table == null) { return null; } - return new BeamCalciteTable(tableProvider.buildBeamSqlTable(table), getPipelineOptions()); + return new BeamCalciteTable( + tableProvider.buildBeamSqlTable(table), + getPipelineOptions(), + connection.getPipelineOptions()); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java index f6a016b..339810c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java @@ -51,8 +51,8 @@ import org.apache.calcite.schema.Table; * a normal JDBC path, e.g. when CLI connects to {@link JdbcDriver} (without any extra connection * properties). * - * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider)} to avoid - * loading all available table providers. + * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider, + * org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers. */ class BeamCalciteSchemaFactory { @@ -97,10 +97,10 @@ class BeamCalciteSchemaFactory { } /** - * This is the override to create an empty schema, used in {@link - * JdbcDriver#connect(TableProvider)} to avoid loading all table providers. This schema is - * expected to be replaced by an actual functional schema by the same code that specified this - * override in the first place. + * This is the override to create an empty schema, used in {@link JdbcDriver#connect(TableProvider + * , org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all table providers. This + * schema is expected to be replaced by an actual functional schema by the same code that + * specified this override in the first place. */ public static class Empty extends InitialEmptySchema implements SchemaFactory { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java index e800c82..50a131a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java @@ -21,9 +21,12 @@ import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; import org.apache.calcite.adapter.java.AbstractQueryableTable; import org.apache.calcite.linq4j.QueryProvider; @@ -38,22 +41,31 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.ModifiableTable; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; import org.apache.calcite.schema.TranslatableTable; /** Adapter from {@link BeamSqlTable} to a calcite Table. */ public class BeamCalciteTable extends AbstractQueryableTable implements ModifiableTable, TranslatableTable { private final BeamSqlTable beamTable; - private final Map<String, String> pipelineOptions; + // These two options should be unified. + // https://issues.apache.org/jira/projects/BEAM/issues/BEAM-7590 + private final Map<String, String> pipelineOptionsMap; + private PipelineOptions pipelineOptions; - BeamCalciteTable(BeamSqlTable beamTable, Map<String, String> pipelineOptions) { + BeamCalciteTable( + BeamSqlTable beamTable, + Map<String, String> pipelineOptionsMap, + PipelineOptions pipelineOptions) { super(Object[].class); this.beamTable = beamTable; + this.pipelineOptionsMap = pipelineOptionsMap; this.pipelineOptions = pipelineOptions; } public static BeamCalciteTable of(BeamSqlTable table) { - return new BeamCalciteTable(table, ImmutableMap.of()); + return new BeamCalciteTable(table, ImmutableMap.of(), null); } @Override @@ -61,9 +73,36 @@ public class BeamCalciteTable extends AbstractQueryableTable return CalciteUtils.toCalciteRowType(this.beamTable.getSchema(), typeFactory); } + private PipelineOptions getPipelineOptions() { + if (pipelineOptions != null) { + return pipelineOptions; + } + + pipelineOptions = BeamEnumerableConverter.createPipelineOptions(pipelineOptionsMap); + return pipelineOptions; + } + + @Override + public Statistic getStatistic() { + /* + Changing class loader is required for the JDBC path. It is similar to what done in + {@link BeamEnumerableConverter#toRowList} and {@link BeamEnumerableConverter#toEnumerable }. + */ + final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader()); + BeamRowCountStatistics beamStatistics = beamTable.getRowCount(getPipelineOptions()); + return beamStatistics.isUnknown() + ? Statistics.UNKNOWN + : Statistics.of(beamStatistics.getRowCount().doubleValue(), ImmutableList.of()); + } finally { + Thread.currentThread().setContextClassLoader(originalClassLoader); + } + } + @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) { - return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, pipelineOptions); + return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, pipelineOptionsMap); } @Override @@ -97,6 +136,6 @@ public class BeamCalciteTable extends AbstractQueryableTable sourceExpressionList, flattened, beamTable, - pipelineOptions); + pipelineOptionsMap); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java similarity index 51% copy from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java copy to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java index 14f1b80..ac0431d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java @@ -15,25 +15,30 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.extensions.sql; +package org.apache.beam.sdk.extensions.sql.impl; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.Row; +import java.io.Serializable; +import java.math.BigInteger; -/** This interface defines a Beam Sql Table. */ -public interface BeamSqlTable { - /** create a {@code PCollection<Row>} from source. */ - PCollection<Row> buildIOReader(PBegin begin); +/** This class stores row count statistics. */ +public class BeamRowCountStatistics implements Serializable { + public static final BeamRowCountStatistics UNKNOWN = new BeamRowCountStatistics(null); + private final BigInteger rowCount; - /** create a {@code IO.write()} instance to write to target. */ - POutput buildIOWriter(PCollection<Row> input); + private BeamRowCountStatistics(BigInteger rowCount) { + this.rowCount = rowCount; + } - /** Whether this table is bounded (known to be finite) or unbounded (may or may not be finite). */ - PCollection.IsBounded isBounded(); + public static BeamRowCountStatistics createBoundedTableStatistics(BigInteger rowCount) { + return new BeamRowCountStatistics(rowCount); + } - /** Get the schema info of the table. */ - Schema getSchema(); + /** Is true if the row count cannot be estimated. */ + public boolean isUnknown() { + return rowCount == null; + } + + public BigInteger getRowCount() { + return rowCount; + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index 02b3e69..ae4238d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -38,6 +38,8 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider; import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings; @@ -66,14 +68,26 @@ public class BeamSqlEnv { return new BeamSqlEnvBuilder(tableProvider); } + /** + * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty + * Pipeline Options. It should only be used in tests. + */ public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) { return withTableProvider(new ReadOnlyTableProvider(tableType, tables)); } + /** + * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty + * Pipeline Options. It should only be used in tests. + */ public static BeamSqlEnv withTableProvider(TableProvider tableProvider) { - return builder(tableProvider).build(); + return builder(tableProvider).setPipelineOptions(PipelineOptionsFactory.create()).build(); } + /** + * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty * + * Pipeline Options. It should only be used in tests. + */ public static BeamSqlEnv inMemory(TableProvider... tableProviders) { InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore(); for (TableProvider tableProvider : tableProviders) { @@ -123,6 +137,7 @@ public class BeamSqlEnv { private Set<Map.Entry<String, Function>> functionSet; private boolean autoLoadBuiltinFunctions; private boolean autoLoadUdfs; + private PipelineOptions pipelineOptions; private BeamSqlEnvBuilder(TableProvider tableProvider) { checkNotNull(tableProvider, "Table provider for the default schema must be sets."); @@ -133,6 +148,7 @@ public class BeamSqlEnv { functionSet = new HashSet<>(); autoLoadUdfs = false; autoLoadBuiltinFunctions = false; + pipelineOptions = null; } /** Add a top-level schema backed by the table provider. */ @@ -194,14 +210,20 @@ public class BeamSqlEnv { return this; } + public BeamSqlEnvBuilder setPipelineOptions(PipelineOptions pipelineOptions) { + this.pipelineOptions = pipelineOptions; + return this; + } + /** * Build function to create an instance of BeamSqlEnv based on preset fields. * * @return BeamSqlEnv. */ public BeamSqlEnv build() { + checkNotNull(pipelineOptions); - JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider); + JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider, pipelineOptions); configureSchemas(jdbcConnection); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java index b8ad7f0..8969cd5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java @@ -45,6 +45,7 @@ public class JdbcConnection extends CalciteConnectionWrapper { private static final String PIPELINE_OPTION_PREFIX = "beam."; private Map<String, String> pipelineOptionsMap; + private PipelineOptions pipelineOptions; private JdbcConnection(CalciteConnection connection) throws SQLException { super(connection); @@ -97,6 +98,14 @@ public class JdbcConnection extends CalciteConnectionWrapper { this.pipelineOptionsMap = ImmutableMap.copyOf(pipelineOptionsMap); } + public void setPipelineOptions(PipelineOptions pipelineOptions) { + this.pipelineOptions = pipelineOptions; + } + + public PipelineOptions getPipelineOptions() { + return this.pipelineOptions; + } + /** Get the current default schema from the root schema. */ @SuppressWarnings("TypeParameterUnusedInFormals") <T> T getCurrentBeamSchema() { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java index bb7cc42..edea9db 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.extensions.sql.impl; import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY; import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.auto.service.AutoService; import java.sql.Connection; import java.sql.SQLException; @@ -100,6 +101,8 @@ public class JdbcDriver extends Driver { INSTANCE.register(); } + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Override protected AvaticaFactory createFactory() { return JdbcFactory.wrap((CalciteFactory) super.createFactory()); @@ -141,7 +144,7 @@ public class JdbcDriver extends Driver { * not this path. The CLI ends up using the schema factory that populates the default schema with * all table providers it can find. See {@link BeamCalciteSchemaFactory}. */ - public static JdbcConnection connect(TableProvider tableProvider) { + public static JdbcConnection connect(TableProvider tableProvider, PipelineOptions options) { try { Properties properties = new Properties(); properties.setProperty( @@ -149,6 +152,7 @@ public class JdbcDriver extends Driver { JdbcConnection connection = (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties); connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider); + connection.setPipelineOptions(options); return connection; } catch (SQLException e) { throw new RuntimeException(e); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java index 621f149..222e4cf 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java @@ -17,19 +17,27 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; +import java.io.IOException; import java.io.Serializable; +import java.math.BigInteger; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@code BigQueryTable} represent a BigQuery table as a target. This provider does not currently @@ -39,6 +47,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF class BigQueryTable extends BaseBeamTable implements Serializable { @VisibleForTesting final String bqLocation; private final ConversionOptions conversionOptions; + private BeamRowCountStatistics rowCountStatistics = null; + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTable.class); BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) { super(table.getSchema()); @@ -47,6 +57,16 @@ class BigQueryTable extends BaseBeamTable implements Serializable { } @Override + public BeamRowCountStatistics getRowCount(PipelineOptions options) { + + if (rowCountStatistics == null) { + rowCountStatistics = getRowCountFromBQ(options, bqLocation); + } + + return rowCountStatistics; + } + + @Override public PCollection.IsBounded isBounded() { return PCollection.IsBounded.BOUNDED; } @@ -72,4 +92,23 @@ class BigQueryTable extends BaseBeamTable implements Serializable { .withFormatFunction(BigQueryUtils.toTableRow()) .to(bqLocation)); } + + private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) { + try { + BigInteger rowCount = + BigQueryHelpers.getNumRows( + o.as(BigQueryOptions.class), BigQueryHelpers.parseTableSpec(bqLocation)); + + if (rowCount == null) { + return BeamRowCountStatistics.UNKNOWN; + } + + return BeamRowCountStatistics.createBoundedTableStatistics(rowCount); + + } catch (IOException | InterruptedException e) { + LOGGER.warn("Could not get the row count for the table " + bqLocation, e); + } + + return BeamRowCountStatistics.UNKNOWN; + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java index 517309d..3b6dda0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java @@ -24,6 +24,7 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; import java.sql.Connection; import java.sql.ResultSet; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -42,6 +43,7 @@ public class BeamSqlEnvTest { BeamSqlEnv.builder(root) .addSchema("nested", nested) .addSchema("anotherOne", anotherOne) + .setPipelineOptions(PipelineOptionsFactory.create()) .build(); Connection connection = env.connection; @@ -60,6 +62,9 @@ public class BeamSqlEnvTest { exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound"))); TestTableProvider root = new TestTableProvider(); - BeamSqlEnv.builder(root).setQueryPlannerClassName("org.test.ClassNotFound").build(); + BeamSqlEnv.builder(root) + .setQueryPlannerClassName("org.test.ClassNotFound") + .setPipelineOptions(PipelineOptionsFactory.create()) + .build(); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java index 6f36173..3272d00 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.values.Row; @@ -197,7 +198,7 @@ public class JdbcDriverTest { @Test public void testSelectsFromExistingTable() throws Exception { TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider); + Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); connection .createStatement() @@ -219,7 +220,7 @@ public class JdbcDriverTest { @Test public void testTimestampWithDefaultTimezone() throws Exception { TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider); + Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); // A table with one TIMESTAMP column Schema schema = Schema.builder().addDateTimeField("ts").build(); @@ -250,7 +251,7 @@ public class JdbcDriverTest { public void testTimestampWithNonzeroTimezone() throws Exception { Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("Asia/Tokyo"), Locale.ROOT); TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider); + Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); // A table with one TIMESTAMP column Schema schema = Schema.builder().addDateTimeField("ts").build(); @@ -280,7 +281,7 @@ public class JdbcDriverTest { public void testTimestampWithZeroTimezone() throws Exception { Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT); TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider); + Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); // A table with one TIMESTAMP column Schema schema = Schema.builder().addDateTimeField("ts").build(); @@ -309,7 +310,7 @@ public class JdbcDriverTest { @Test public void testSelectsFromExistingComplexTable() throws Exception { TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider); + Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); connection .createStatement() @@ -343,7 +344,7 @@ public class JdbcDriverTest { @Test public void testInsertIntoCreatedTable() throws Exception { TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider); + Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); connection .createStatement() @@ -369,7 +370,8 @@ public class JdbcDriverTest { @Test public void testInternalConnect_boundedTable() throws Exception { - CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE); + CalciteConnection connection = + JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery("SELECT * FROM test"); assertTrue(resultSet.next()); @@ -392,7 +394,8 @@ public class JdbcDriverTest { .addRows(1, "second first") .addRows(2, "second"))); - CalciteConnection connection = JdbcDriver.connect(tableProvider); + CalciteConnection connection = + JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); Statement statement = connection.createStatement(); ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 5"); assertTrue(resultSet1.next()); @@ -432,7 +435,8 @@ public class JdbcDriverTest { .timestampColumnIndex(3) .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE))); - CalciteConnection connection = JdbcDriver.connect(tableProvider); + CalciteConnection connection = + JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create()); Statement statement = connection.createStatement(); ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 1"); @@ -470,7 +474,8 @@ public class JdbcDriverTest { @Test public void testInternalConnect_setDirectRunner() throws Exception { - CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE); + CalciteConnection connection = + JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()); Statement statement = connection.createStatement(); assertEquals(0, statement.executeUpdate("SET runner = direct")); assertTrue(statement.execute("SELECT * FROM test")); @@ -480,7 +485,8 @@ public class JdbcDriverTest { public void testInternalConnect_setBogusRunner() throws Exception { thrown.expectMessage("Unknown 'runner' specified 'bogus'"); - CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE); + CalciteConnection connection = + JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()); Statement statement = connection.createStatement(); assertEquals(0, statement.executeUpdate("SET runner = bogus")); assertTrue(statement.execute("SELECT * FROM test")); @@ -488,7 +494,8 @@ public class JdbcDriverTest { @Test public void testInternalConnect_resetAll() throws Exception { - CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE); + CalciteConnection connection = + JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()); Statement statement = connection.createStatement(); assertEquals(0, statement.executeUpdate("SET runner = bogus")); assertEquals(0, statement.executeUpdate("RESET ALL")); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java index b7f4215..5d6f460 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.meta.Table; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.junit.Test; @@ -167,7 +168,11 @@ public class BeamDDLTest { TestTableProvider rootProvider = new TestTableProvider(); TestTableProvider testProvider = new TestTableProvider(); - BeamSqlEnv env = BeamSqlEnv.builder(rootProvider).addSchema("test", testProvider).build(); + BeamSqlEnv env = + BeamSqlEnv.builder(rootProvider) + .addSchema("test", testProvider) + .setPipelineOptions(PipelineOptionsFactory.create()) + .build(); assertNull(testProvider.getTables().get("person")); env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text"); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java index 22430ba..638c20f 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver; import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.TypeName; @@ -331,7 +332,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { */ public void check(Pipeline pipeline) throws Exception { checkPTransform(pipeline); - checkJdbc(); + checkJdbc(pipeline.getOptions()); } private static final Schema DUMMY_SCHEMA = Schema.builder().addBooleanField("dummy").build(); @@ -353,7 +354,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { Schema.FieldType.STRING, "name") .addRows(1, "first"))); - private void checkJdbc() throws Exception { + private void checkJdbc(PipelineOptions pipelineOptions) throws Exception { // Beam SQL code is only invoked when the calling convention insists on it, so we // have to express this as selecting from a Beam table, even though the contents are // irrelevant. @@ -363,7 +364,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { // // Here we create a Beam table just to force the calling convention. TestTableProvider tableProvider = new TestTableProvider(); - Connection connection = JdbcDriver.connect(tableProvider); + Connection connection = JdbcDriver.connect(tableProvider, pipelineOptions); connection .createStatement() .executeUpdate("CREATE EXTERNAL TABLE dummy (dummy BOOLEAN) TYPE 'test'"); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java new file mode 100644 index 0000000..23d09fc --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; + +import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64; +import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING; +import static org.apache.beam.sdk.schemas.Schema.toSchema; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.math.BigInteger; +import java.util.stream.Stream; +import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; +import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests form writing to BigQuery with Beam SQL. */ +@RunWith(JUnit4.class) +public class BigQueryRowCountIT { + private static final Schema SOURCE_SCHEMA = + Schema.builder().addNullableField("id", INT64).addNullableField("name", STRING).build(); + private static final String FAKE_JOB_NAME = "testPipelineOptionInjectionFakeJobName"; + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + @Rule public transient TestPipeline readingPipeline = TestPipeline.create(); + @Rule public transient TestBigQuery bigQuery = TestBigQuery.create(SOURCE_SCHEMA); + + @Test + public void testEmptyTable() { + BigQueryTableProvider provider = new BigQueryTableProvider(); + Table table = getTable("testTable", bigQuery.tableSpec()); + BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); + BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); + assertNotNull(size); + assertEquals(BigInteger.ZERO, size.getRowCount()); + } + + @Test + public void testNonEmptyTable() { + BigQueryTableProvider provider = new BigQueryTableProvider(); + Table table = getTable("testTable", bigQuery.tableSpec()); + + pipeline + .apply( + Create.of( + new TableRow().set("id", 1).set("name", "name1"), + new TableRow().set("id", 2).set("name", "name2"), + new TableRow().set("id", 3).set("name", "name3")) + .withCoder(TableRowJsonCoder.of())) + .apply( + BigQueryIO.writeTableRows() + .to(bigQuery.tableSpec()) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("id").setType("INTEGER"), + new TableFieldSchema().setName("name").setType("STRING")))) + .withoutValidation()); + pipeline.run().waitUntilFinish(); + + BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); + BeamRowCountStatistics size1 = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); + + assertNotNull(size1); + assertEquals(BigInteger.valueOf(3), size1.getRowCount()); + } + + /** This tests if the pipeline options are injected in the path of SQL Transform. */ + @Test + public void testPipelineOptionInjection() { + BigQueryTestTableProvider provider = new BigQueryTestTableProvider(); + Table table = getTable("testTable", bigQuery.tableSpec()); + provider.addTable("testTable", table); + + pipeline + .apply( + Create.of( + new TableRow().set("id", 1).set("name", "name1"), + new TableRow().set("id", 2).set("name", "name2"), + new TableRow().set("id", 3).set("name", "name3")) + .withCoder(TableRowJsonCoder.of())) + .apply( + BigQueryIO.writeTableRows() + .to(bigQuery.tableSpec()) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("id").setType("INTEGER"), + new TableFieldSchema().setName("name").setType("STRING")))) + .withoutValidation()); + pipeline.run().waitUntilFinish(); + + // changing pipeline options + readingPipeline.getOptions().setJobName(FAKE_JOB_NAME); + + // Reading from the table should update the statistics of bigQuery table + readingPipeline.apply( + SqlTransform.query(" select * from testTable ") + .withDefaultTableProvider("bigquery", provider)); + + readingPipeline.run().waitUntilFinish(); + + BigQueryTestTable sqlTable = (BigQueryTestTable) provider.buildBeamSqlTable(table); + assertEquals(FAKE_JOB_NAME, sqlTable.getJobName()); + } + + @Test + public void testFakeTable() { + BigQueryTableProvider provider = new BigQueryTableProvider(); + Table table = getTable("fakeTable", "project:dataset.table"); + + BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); + BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions()); + assertTrue(size.isUnknown()); + } + + private static Table getTable(String name, String location) { + return Table.builder() + .name(name) + .comment(name + " table") + .location(location) + .schema( + Stream.of(Schema.Field.nullable("id", INT64), Schema.Field.nullable("name", STRING)) + .collect(toSchema())) + .type("bigquery") + .build(); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java new file mode 100644 index 0000000..db954ae --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; + +import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * A BigQueryTable that keeps jobName from the pipeline options whenever row count is called. It is + * made for {@link BigQueryRowCountIT#testPipelineOptionInjection()} + */ +public class BigQueryTestTable extends BigQueryTable { + private String jobName = null; + + BigQueryTestTable(Table table, BigQueryUtils.ConversionOptions options) { + super(table, options); + } + + @Override + public BeamRowCountStatistics getRowCount(PipelineOptions options) { + jobName = options.getJobName(); + return super.getRowCount(options); + } + + String getJobName() { + return this.jobName; + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java new file mode 100644 index 0000000..d8656ea --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery; + +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.sql.BeamSqlTable; +import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; + +/** A test table provider for BigQueryRowCountIT. */ +public class BigQueryTestTableProvider extends BigQueryTableProvider { + + private Map<String, Table> tableSpecMap; + private Map<String, BeamSqlTable> beamSqlTableMap; + + BigQueryTestTableProvider() { + super(); + tableSpecMap = new HashMap<>(); + beamSqlTableMap = new HashMap<>(); + } + + void addTable(String name, Table table) { + tableSpecMap.put(name, table); + } + + @Nullable + @Override + public Table getTable(String tableName) { + return tableSpecMap.get(tableName); + } + + @Override + public BeamSqlTable buildBeamSqlTable(Table table) { + BeamSqlTable t = beamSqlTableMap.get(table.getLocation()); + if (t != null) { + return t; + } + + t = + new BigQueryTestTable( + table, + BigQueryUtils.ConversionOptions.builder() + .setTruncateTimestamps( + firstNonNull(table.getProperties().getBoolean("truncateTimestamps"), false) + ? BigQueryUtils.ConversionOptions.TruncateTimestamps.TRUNCATE + : BigQueryUtils.ConversionOptions.TruncateTimestamps.REJECT) + .build()); + beamSqlTableMap.put(table.getLocation(), t); + + return t; + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java index 2cda8cb..4b4ceeb 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java @@ -349,7 +349,7 @@ public class PubsubJsonIT implements Serializable { inMemoryMetaStore.registerProvider(tableProvider); } - JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore); + JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore, options); connection.setPipelineOptionsMap(argsMap); return connection; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index 82bb5ad..60764bb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -25,12 +25,14 @@ import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; import com.google.api.services.bigquery.model.TimePartitioning; import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto; import com.google.cloud.hadoop.util.ApiErrorExtractor; import java.io.IOException; +import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -541,6 +543,23 @@ public class BigQueryHelpers { } } + /** + * It returns the number of rows for a given table. + * + * @return The number of rows in the table or null if it cannot get any estimate. + */ + @Nullable + public static BigInteger getNumRows(BigQueryOptions options, TableReference tableRef) + throws InterruptedException, IOException { + + DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options); + Table table = datasetService.getTable(tableRef); + if (table == null) { + return null; + } + return table.getNumRows(); + } + static String getDatasetLocation( DatasetService datasetService, String projectId, String datasetId) { Dataset dataset;