This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 080f54a144a [BEAM-13806] Add x-lang BigQuery IO integration test to Go 
SDK. (#16818)
080f54a144a is described below

commit 080f54a144a5751c7ab43aadb2fb87b8d49970d0
Author: Daniel Oliveira <daniel.o.program...@gmail.com>
AuthorDate: Mon Jun 13 21:21:26 2022 -0700

    [BEAM-13806] Add x-lang BigQuery IO integration test to Go SDK. (#16818)
    
    * [BEAM-13806] Add x-lang BigQuery IO integration test to Go SDK.
    
    Also includes piping in flags for BigQuery IO through integration test 
script, and a small file for creating bigquery tables that expire after a day.
    
    * [BEAM-13806] Splitting BigQuery IT into read and write pipelines.
    
    Splits the integration test into two pipelines to run sequentially. Also 
drops table after a successful test and logs table names.
    
    * Fixup: Fix gradle build and undo VR script changes.
    
    * Fixup: Add Query test and fix deterministic random element generation.
    
    CreateRows wasn't creating the same elements in both read and write 
pipelines after splitting the two pipelines. Adjusted it to use a consistent 
seed in all pipelines.
    
    * Fixup: Avoiding inline functions
    
    * Workaround to coder issue, plus some debugging code
    
    * Polishing workaround with documentation and removing debug prints.
    
    * Move pipeline code to test file
    
    * Split Query test from non-query test
    
    Co-authored-by: Robert Burke <lostl...@users.noreply.github.com>
---
 sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go   |   7 +
 sdks/go/test/build.gradle                          |   4 +
 sdks/go/test/integration/flags.go                  |   5 +
 sdks/go/test/integration/integration.go            |  11 +
 .../test/integration/io/xlang/bigquery/bigquery.go |  17 ++
 .../integration/io/xlang/bigquery/bigquery_test.go | 270 +++++++++++++++++++++
 .../go/test/integration/io/xlang/bigquery/table.go |  52 ++++
 7 files changed, 366 insertions(+)

diff --git a/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go 
b/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go
index 19f2d626a77..6bfc8939106 100644
--- a/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go
+++ b/sdks/go/pkg/beam/io/xlang/bigqueryio/bigquery.go
@@ -187,8 +187,15 @@ func FromTable(table string) readOption {
 // FromQuery is a Read option that specifies a query to use for reading from 
BigQuery. Uses the
 // BigQuery Standard SQL dialect.
 //
+// Important: When reading from a query, the schema of any source tables is 
not used and the read
+// transform cannot detect which elements are Required, therefore every field 
in the output type
+// will be a pointer (including fields within inner structs).
+//
 // For more details see in the Java SDK:
 // 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Read.fromQuery(java.lang.String)
+//
+// BUG(https://github.com/apache/beam/issues/21784): Query read outputs 
currently cannot be named
+// struct types. See link for workaround.
 func FromQuery(query string) readOption {
        return func(rc *readConfig) {
                rc.cfg.Query = &query
diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle
index 28e8c548285..4ea0c57f858 100644
--- a/sdks/go/test/build.gradle
+++ b/sdks/go/test/build.gradle
@@ -173,18 +173,22 @@ ext.goIoValidatesRunnerTask = { proj, name, scriptOpts, 
pipelineOpts ->
     dependsOn ":sdks:java:io:expansion-service:build"
     dependsOn ":sdks:java:extensions:schemaio-expansion-service:build"
     dependsOn ":sdks:java:io:debezium:expansion-service:build"
+    dependsOn ":sdks:java:io:google-cloud-platform:expansion-service:build"
     dependsOn ":sdks:java:testing:kafka-service:buildTestKafkaServiceJar"
 
     doLast {
       def ioExpJar = 
project(":sdks:java:io:expansion-service").shadowJar.archivePath
       def schemaIoExpJar = 
project(":sdks:java:extensions:schemaio-expansion-service").shadowJar.archivePath
       def debeziumIoExpJar = 
project(":sdks:java:io:debezium:expansion-service").shadowJar.archivePath
+      def gcpIoExpJar = 
project(":sdks:java:io:google-cloud-platform:expansion-service").shadowJar.archivePath
       def kafkaJar = 
project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
       def pipelineOptions = [  // Pipeline options piped directly to Go SDK 
flags.
           "--kafka_jar=${kafkaJar}",
           "--expansion_jar=io:${ioExpJar}",
           "--expansion_jar=schemaio:${schemaIoExpJar}",
           "--expansion_jar=debeziumio:${debeziumIoExpJar}",
+          "--expansion_jar=gcpio:${gcpIoExpJar}",
+          "--bq_dataset=apache-beam-testing.beam_bigquery_io_test_temp",
       ]
       pipelineOptions.addAll(pipelineOpts)
       def options = [
diff --git a/sdks/go/test/integration/flags.go 
b/sdks/go/test/integration/flags.go
index 72954f2be06..3a58826a436 100644
--- a/sdks/go/test/integration/flags.go
+++ b/sdks/go/test/integration/flags.go
@@ -42,6 +42,11 @@ var (
                "Sets an auto-shutdown timeout to the Kafka cluster. "+
                        "Requires the timeout command to be present in Path, 
unless the value is set to \"\".")
 
+       // BigQueryDataset is the name of the dataset to create tables in for
+       // BigQuery integration tests.
+       BigQueryDataset = flag.String("bq_dataset", "",
+               "Name of the dataset to create tables in for BigQuery tests.")
+
        // ExpansionJars contains elements in the form "label:jar" describing 
jar
        // filepaths for expansion services to use in integration tests, and the
        // corresponding labels. Once provided through this flag, those jars can
diff --git a/sdks/go/test/integration/integration.go 
b/sdks/go/test/integration/integration.go
index 939bb6df2c5..2af95303304 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -64,6 +64,7 @@ var directFilters = []string{
        // The direct runner does not yet support cross-language.
        "TestXLang.*",
        "TestKafkaIO.*",
+       "TestBigQueryIO.*",
        "TestDebeziumIO_BasicRead",
        "TestJDBCIO_BasicReadWrite",
        "TestJDBCIO_PostgresReadWrite",
@@ -93,6 +94,8 @@ var portableFilters = []string{
        "TestPanes",
        // TODO(BEAM-12797): Python portable runner times out on Kafka reads.
        "TestKafkaIO.*",
+       // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow 
portable runners.
+       "TestBigQueryIO.*",
        // The portable runner does not support self-checkpointing
        "TestCheckpointing",
        // The portable runner does not support pipeline drain for SDF.
@@ -107,6 +110,8 @@ var flinkFilters = []string{
        // TODO(BEAM-12815): Test fails on post commits: "Insufficient number 
of network buffers".
        "TestXLang_Multi",
        "TestDebeziumIO_BasicRead",
+       // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow 
portable runners.
+       "TestBigQueryIO.*",
        // The number of produced outputs in AfterSynchronizedProcessingTime 
varies in different runs.
        "TestTriggerAfterSynchronizedProcessingTime",
        // The flink runner does not support pipeline drain for SDF.
@@ -126,6 +131,8 @@ var samzaFilters = []string{
        "TestPanes",
        // TODO(BEAM-13006): Samza doesn't yet support post job metrics, used 
by WordCount
        "TestWordCount.*",
+       // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow 
portable runners.
+       "TestBigQueryIO.*",
        // The Samza runner does not support self-checkpointing
        "TestCheckpointing",
        // The samza runner does not support pipeline drain for SDF.
@@ -146,6 +153,8 @@ var sparkFilters = []string{
        "TestPanes",
        // [BEAM-13921]: Spark doesn't support side inputs to executable stages
        "TestDebeziumIO_BasicRead",
+       // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow 
portable runners.
+       "TestBigQueryIO.*",
        // The spark runner does not support self-checkpointing
        "TestCheckpointing",
        // The spark runner does not support pipeline drain for SDF.
@@ -174,6 +183,8 @@ var dataflowFilters = []string{
        // Dataflow does not automatically terminate the TestCheckpointing 
pipeline when
        // complete.
        "TestCheckpointing",
+       // TODO(21761): This test needs to provide GCP project to expansion 
service.
+       "TestBigQueryIO_BasicWriteQueryRead",
        // Dataflow does not drain jobs by itself.
        "TestDrain",
 }
diff --git a/sdks/go/test/integration/io/xlang/bigquery/bigquery.go 
b/sdks/go/test/integration/io/xlang/bigquery/bigquery.go
new file mode 100644
index 00000000000..37ea08256d5
--- /dev/null
+++ b/sdks/go/test/integration/io/xlang/bigquery/bigquery.go
@@ -0,0 +1,17 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package bigquery contains integration tests for cross-language BigQuery IO 
transforms.
+package bigquery
diff --git a/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go 
b/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go
new file mode 100644
index 00000000000..0f7516d1986
--- /dev/null
+++ b/sdks/go/test/integration/io/xlang/bigquery/bigquery_test.go
@@ -0,0 +1,270 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package bigquery
+
+import (
+       "flag"
+       "fmt"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+       "log"
+       "math/rand"
+       "reflect"
+       "strings"
+       "testing"
+       "time"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio"
+       _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+func init() {
+       register.DoFn2x0[[]byte, func(TestRow)](&CreateTestRowsFn{})
+       register.Emitter1[TestRow]()
+       // TODO(https://github.com/apache/beam/issues/21789): Uncomment once 
this register no longer panics.
+       //register.Function1x1(castFn)
+}
+
+var expansionAddr string // Populate with expansion address labelled "gcpio".
+
+func checkFlags(t *testing.T) {
+       if *integration.BigQueryDataset == "" {
+               t.Skip("No BigQuery dataset provided.")
+       }
+}
+
+const (
+       // A text to shuffle to get random words.
+       text = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Maecenas eget nulla nec " +
+               "velit hendrerit placerat. Donec eu odio ultricies, fermentum 
arcu at, mollis lectus. " +
+               "Vestibulum porttitor pharetra sem vitae feugiat. Mauris 
facilisis neque in mauris " +
+               "feugiat rhoncus. Donec eu ipsum at nibh lobortis euismod. Nam 
at hendrerit felis. " +
+               "Vivamus et orci ex. Nam dui nisl, rutrum ac pretium eget, 
vehicula in tortor. Class " +
+               "aptent taciti sociosqu ad litora torquent per conubia nostra, 
per inceptos himenaeos. " +
+               "Phasellus ante lorem, pharetra blandit dapibus et, tempus nec 
purus. Maecenas in " +
+               "posuere sem, vel pharetra nisl. Pellentesque habitant morbi 
tristique senectus et netus " +
+               "et malesuada fames ac turpis egestas. Donec nec facilisis ex. 
Praesent euismod commodo " +
+               "efficitur. Fusce in nisi nunc."
+       // Number of random elements to create for test. Must be less than 
number of words in text.
+       inputSize = 50
+)
+
+// TestRow is a sample row to write and read from that is expected to contain 
enough deterministic
+// and random data in different data types to provide a reasonable signal that 
reading and writing
+// works at a basic level.
+type TestRow struct {
+       Counter   int64    `beam:"counter"`   // A deterministic counter, 
increments for each row generated.
+       Rand_data RandData `beam:"rand_data"` // An inner struct containing 
randomized data.
+}
+
+func shuffleText() []string {
+       words := strings.Fields(text)
+       rand.Shuffle(len(words), func(i, j int) { words[i], words[j] = 
words[j], words[i] })
+       return words
+}
+
+// RandData is a struct of various types of random data.
+type RandData struct {
+       Flip bool   `beam:"flip"` // Flip is a bool with a random chance of 
either result (a coin flip).
+       Num  int64  `beam:"num"`  // Num is a random int64.
+       Word string `beam:"word"` // Word is a randomly selected word from a 
sample text.
+}
+
+// ddlSchema is a string for BigQuery data definition language that 
corresponds to TestRow.
+const ddlTestRowSchema = "counter INT64 NOT NULL, " +
+       "rand_data STRUCT<" +
+       "flip BOOL NOT NULL," +
+       "num INT64 NOT NULL," +
+       "word STRING NOT NULL" +
+       "> NOT NULL"
+
+// CreateTestRowsFn is a DoFn that creates randomized TestRows based on a seed.
+type CreateTestRowsFn struct {
+       seed int64
+}
+
+// ProcessElement creates a number of TestRows, populating the randomized data.
+func (fn *CreateTestRowsFn) ProcessElement(_ []byte, emit func(TestRow)) {
+       rand.Seed(fn.seed)
+       words := shuffleText()
+       for i := 0; i < inputSize; i++ {
+               emit(TestRow{
+                       Counter: int64(i),
+                       Rand_data: RandData{
+                               Flip: rand.Int63n(2) != 0,
+                               Num:  rand.Int63(),
+                               Word: words[i],
+                       },
+               })
+       }
+}
+
+// WritePipeline creates a pipeline that writes elements created by createFn 
into a BigQuery table.
+func WritePipeline(expansionAddr, table string, createFn interface{}) 
*beam.Pipeline {
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       // Generate elements and write to table.
+       rows := beam.ParDo(s, createFn, beam.Impulse(s))
+       bigqueryio.Write(s, table, rows,
+               bigqueryio.CreateDisposition(bigqueryio.CreateNever),
+               bigqueryio.WriteExpansionAddr(expansionAddr))
+
+       return p
+}
+
+// ReadPipeline creates a pipeline that reads elements directly from a 
BigQuery table and asserts
+// that they match elements created by createFn.
+func ReadPipeline(expansionAddr, table string, createFn interface{}) 
*beam.Pipeline {
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       // Read from table and compare to generated elements.
+       rows := beam.ParDo(s, createFn, beam.Impulse(s))
+       inType := reflect.TypeOf((*TestRow)(nil)).Elem()
+       readRows := bigqueryio.Read(s, inType,
+               bigqueryio.FromTable(table),
+               bigqueryio.ReadExpansionAddr(expansionAddr))
+       passert.Equals(s, readRows, rows)
+
+       return p
+}
+
+// TestRowPtrs is equivalent to TestRow but all fields are pointers, meant to 
be used when reading
+// via query.
+//
+// TODO(https://github.com/apache/beam/issues/21784): Change back to a named 
struct once resolved.
+type TestRowPtrs = struct {
+       Counter   *int64        `beam:"counter"`
+       Rand_data *RandDataPtrs `beam:"rand_data"`
+}
+
+// RandDataPtrs is equivalent to RandData but all fields are pointers, meant 
to be used when reading
+// via query.
+//
+// TODO(https://github.com/apache/beam/issues/21784): Change back to a named 
struct once resolved.
+type RandDataPtrs = struct {
+       Flip *bool   `beam:"flip"`
+       Num  *int64  `beam:"num"`
+       Word *string `beam:"word"`
+}
+
+// castFn converts the result of the query which has pointer fields, into the 
original TestRow
+// type that was written to BigQuery.
+func castFn(elm TestRowPtrs) TestRow {
+       return TestRow{
+               Counter: *elm.Counter,
+               Rand_data: RandData{
+                       Flip: *elm.Rand_data.Flip,
+                       Num:  *elm.Rand_data.Num,
+                       Word: *elm.Rand_data.Word,
+               },
+       }
+}
+
+// ReadPipeline creates a pipeline that reads elements from a BigQuery table 
via a SQL Query, and
+// asserts that they match elements created by createFn.
+func ReadFromQueryPipeline(expansionAddr, table string, createFn interface{}) 
*beam.Pipeline {
+       p := beam.NewPipeline()
+       s := p.Root()
+
+       // Read from table and compare to generated elements.
+       rows := beam.ParDo(s, createFn, beam.Impulse(s))
+       inType := reflect.TypeOf((*TestRowPtrs)(nil)).Elem()
+       query := fmt.Sprintf("SELECT * FROM `%s`", table)
+       readRows := bigqueryio.Read(s, inType,
+               bigqueryio.FromQuery(query),
+               bigqueryio.ReadExpansionAddr(expansionAddr))
+       castRows := beam.ParDo(s, castFn, readRows)
+       passert.Equals(s, castRows, rows)
+
+       return p
+}
+
+// TestBigQueryIO_BasicWriteRead runs a pipeline that generates 
semi-randomized elements, writes
+// them to a BigQuery table and then reads from that table, and checks that 
the result matches the
+// original inputs. This requires a pre-existing table to be created.
+func TestBigQueryIO_BasicWriteRead(t *testing.T) {
+       integration.CheckFilters(t)
+       checkFlags(t)
+
+       // Create a table before running the pipeline
+       table, err := newTempTable(*integration.BigQueryDataset, "go_bqio_it", 
ddlTestRowSchema)
+       if err != nil {
+               t.Fatalf("error creating BigQuery table: %v", err)
+       }
+       t.Logf("Created BigQuery table %v", table)
+
+       createTestRows := &CreateTestRowsFn{seed: time.Now().UnixNano()}
+       write := WritePipeline(expansionAddr, table, createTestRows)
+       ptest.RunAndValidate(t, write)
+       read := ReadPipeline(expansionAddr, table, createTestRows)
+       ptest.RunAndValidate(t, read)
+
+       t.Logf("Deleting BigQuery table %v", table)
+       err = deleteTempTable(table)
+       if err != nil {
+               t.Logf("Error deleting BigQuery table: %v", err)
+       }
+}
+
+// TestBigQueryIO_BasicWriteQueryRead runs a pipeline that generates 
semi-randomized elements,
+// writes them to a BigQuery table and then reads from that table, and checks 
that the result
+// matches the original inputs. This requires a pre-existing table to be 
created.
+//
+// This test reads from a Bigquery SQL query, instead of directly from a table.
+func TestBigQueryIO_BasicWriteQueryRead(t *testing.T) {
+       integration.CheckFilters(t)
+       checkFlags(t)
+
+       // Create a table before running the pipeline
+       table, err := newTempTable(*integration.BigQueryDataset, "go_bqio_it", 
ddlTestRowSchema)
+       if err != nil {
+               t.Fatalf("error creating BigQuery table: %v", err)
+       }
+       t.Logf("Created BigQuery table %v", table)
+
+       createTestRows := &CreateTestRowsFn{seed: time.Now().UnixNano()}
+       write := WritePipeline(expansionAddr, table, createTestRows)
+       ptest.RunAndValidate(t, write)
+       readQuery := ReadFromQueryPipeline(expansionAddr, table, createTestRows)
+       ptest.RunAndValidate(t, readQuery)
+
+       t.Logf("Deleting BigQuery table %v", table)
+       err = deleteTempTable(table)
+       if err != nil {
+               t.Logf("Error deleting BigQuery table: %v", err)
+       }
+}
+
+func TestMain(m *testing.M) {
+       flag.Parse()
+       beam.Init()
+
+       services := integration.NewExpansionServices()
+       defer func() { services.Shutdown() }()
+       addr, err := services.GetAddr("gcpio")
+       if err != nil {
+               log.Printf("skipping missing expansion service: %v", err)
+       } else {
+               expansionAddr = addr
+       }
+
+       ptest.MainRet(m)
+}
diff --git a/sdks/go/test/integration/io/xlang/bigquery/table.go 
b/sdks/go/test/integration/io/xlang/bigquery/table.go
new file mode 100644
index 00000000000..7d5f3dca377
--- /dev/null
+++ b/sdks/go/test/integration/io/xlang/bigquery/table.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package bigquery
+
+import (
+       "fmt"
+       "os/exec"
+       "time"
+)
+
+// newTempTable creates a new BigQuery table using BigQuery's Data Definition 
Language (DDL) and the
+// "bq query" console command. Reference: 
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language
+// The tables created are set to expire after a day.
+//
+// newTable takes the name of a BigQuery dataset, the prefix for naming the 
table, and a DDL schema
+// for the data, and generates that table with a unique suffix and an 
expiration time of a day
+// later.
+func newTempTable(dataset, prefix, schema string) (string, error) {
+       name := fmt.Sprintf("%s.%s_temp_%v", dataset, prefix, 
time.Now().UnixNano())
+       query := fmt.Sprintf("CREATE TABLE `%s`(%s) 
OPTIONS(expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 
DAY))", name, schema)
+       cmd := exec.Command("bq", "query", "--use_legacy_sql=false", query)
+       out, err := cmd.CombinedOutput()
+       if err != nil {
+               return "", fmt.Errorf("creating table through command \"%s\" 
failed with output:\n%s", cmd.String(), out)
+       }
+       return name, nil
+}
+
+// deleteTable deletes a BigQuery table using BigQuery's Data Definition 
Language (DDL) and the
+// "bq query" console command. Reference: 
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language
+func deleteTempTable(table string) error {
+       query := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", table)
+       cmd := exec.Command("bq", "query", "--use_legacy_sql=false", query)
+       out, err := cmd.CombinedOutput()
+       if err != nil {
+               return fmt.Errorf("deleting table through command \"%s\" failed 
with output:\n%s", cmd.String(), out)
+       }
+       return nil
+}

Reply via email to