This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 103253348ec Add file loads streaming integration tests (#28312)
103253348ec is described below

commit 103253348ec3f1e193bb124e52dffcc603c929d9
Author: Ahmed Abualsaud <65791736+ahmedab...@users.noreply.github.com>
AuthorDate: Wed Sep 13 15:34:59 2023 +0000

    Add file loads streaming integration tests (#28312)
    
    * file loads streaming integration tests
    
    * fix dynamic destinations copy jobs
    
    * disable for runnerV2 until pane index is fixed
---
 runners/google-cloud-dataflow-java/build.gradle    |   3 +
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java       |  11 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java    |   4 +
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java     |  27 +-
 .../sdk/io/gcp/bigquery/FileLoadsStreamingIT.java  | 497 +++++++++++++++++++++
 5 files changed, 532 insertions(+), 10 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index f6e2b9b147c..2acc30455e2 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -612,6 +612,9 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) 
{
   exclude '**/FhirIOLROIT.class'
   exclude '**/FhirIOSearchIT.class'
   exclude '**/FhirIOPatientEverythingIT.class'
+  // failing due to pane index not incrementing after Reshuffle:
+  // https://github.com/apache/beam/issues/28219
+  exclude '**/FileLoadsStreamingIT.class'
 
   maxParallelForks 4
   classpath = configurations.googleCloudPlatformIntegrationTest
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index f5f193aecb7..32ee29738bf 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -399,10 +398,12 @@ class BatchLoads<DestinationT, ElementT>
                 "Window Into Global Windows",
                 Window.<KV<DestinationT, WriteTables.Result>>into(new 
GlobalWindows())
                     
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
-            .apply("Add Void Key", WithKeys.of((Void) null))
-            .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
-            .apply("GroupByKey", GroupByKey.create())
-            .apply("Extract Values", Values.create())
+            // We use this and the following GBK to aggregate by final 
destination.
+            // This way, each destination has its own pane sequence
+            .apply("AddDestinationKeys", WithKeys.of(result -> 
result.getKey()))
+            .setCoder(KvCoder.of(destinationCoder, tempTables.getCoder()))
+            .apply("GroupTempTablesByFinalDestination", GroupByKey.create())
+            .apply("ExtractTempTables", Values.create())
             .apply(
                 ParDo.of(
                         new UpdateSchemaDestination<DestinationT>(
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 0063952d8b1..00ee815c3c9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -689,6 +689,10 @@ public class BigQueryUtils {
       }
     }
 
+    if (jsonBQValue instanceof byte[] && fieldType.getTypeName() == 
TypeName.BYTES) {
+      return jsonBQValue;
+    }
+
     if (jsonBQValue instanceof List) {
       if (fieldType.getCollectionElementType() == null) {
         throw new IllegalArgumentException(
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 9bff77a1658..f4074cc1a55 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -87,6 +87,7 @@ public class BigQueryUtilsTest {
           .addNullableField("time0s_0ns", 
Schema.FieldType.logicalType(SqlTypes.TIME))
           .addNullableField("valid", Schema.FieldType.BOOLEAN)
           .addNullableField("binary", Schema.FieldType.BYTES)
+          .addNullableField("raw_bytes", Schema.FieldType.BYTES)
           .addNullableField("numeric", Schema.FieldType.DECIMAL)
           .addNullableField("boolean", Schema.FieldType.BOOLEAN)
           .addNullableField("long", Schema.FieldType.INT64)
@@ -188,6 +189,9 @@ public class BigQueryUtilsTest {
   private static final TableFieldSchema BINARY =
       new 
TableFieldSchema().setName("binary").setType(StandardSQLTypeName.BYTES.toString());
 
+  private static final TableFieldSchema RAW_BYTES =
+      new 
TableFieldSchema().setName("raw_bytes").setType(StandardSQLTypeName.BYTES.toString());
+
   private static final TableFieldSchema NUMERIC =
       new 
TableFieldSchema().setName("numeric").setType(StandardSQLTypeName.NUMERIC.toString());
 
@@ -246,6 +250,7 @@ public class BigQueryUtilsTest {
                   TIME_0S_0NS,
                   VALID,
                   BINARY,
+                  RAW_BYTES,
                   NUMERIC,
                   BOOLEAN,
                   LONG,
@@ -276,6 +281,7 @@ public class BigQueryUtilsTest {
                   TIME_0S_0NS,
                   VALID,
                   BINARY,
+                  RAW_BYTES,
                   NUMERIC,
                   BOOLEAN,
                   LONG,
@@ -316,6 +322,7 @@ public class BigQueryUtilsTest {
               LocalTime.parse("12:34"),
               false,
               Base64.getDecoder().decode("ABCD1234"),
+              Base64.getDecoder().decode("ABCD1234"),
               new BigDecimal("123.456").setScale(3, RoundingMode.HALF_UP),
               true,
               123L,
@@ -346,6 +353,7 @@ public class BigQueryUtilsTest {
           .set("time0s_0ns", "12:34:00")
           .set("valid", "false")
           .set("binary", "ABCD1234")
+          .set("raw_bytes", Base64.getDecoder().decode("ABCD1234"))
           .set("numeric", "123.456")
           .set("boolean", true)
           .set("long", 123L)
@@ -355,7 +363,7 @@ public class BigQueryUtilsTest {
       Row.withSchema(FLAT_TYPE)
           .addValues(
               null, null, null, null, null, null, null, null, null, null, 
null, null, null, null,
-              null, null, null, null, null, null, null, null)
+              null, null, null, null, null, null, null, null, null)
           .build();
 
   private static final TableRow BQ_NULL_FLAT_ROW =
@@ -378,6 +386,7 @@ public class BigQueryUtilsTest {
           .set("time0s_0ns", null)
           .set("valid", null)
           .set("binary", null)
+          .set("raw_bytes", null)
           .set("numeric", null)
           .set("boolean", null)
           .set("long", null)
@@ -457,6 +466,7 @@ public class BigQueryUtilsTest {
                   TIME_0S_0NS,
                   VALID,
                   BINARY,
+                  RAW_BYTES,
                   NUMERIC,
                   BOOLEAN,
                   LONG,
@@ -512,6 +522,7 @@ public class BigQueryUtilsTest {
             TIME_0S_0NS,
             VALID,
             BINARY,
+            RAW_BYTES,
             NUMERIC,
             BOOLEAN,
             LONG,
@@ -562,6 +573,7 @@ public class BigQueryUtilsTest {
             TIME_0S_0NS,
             VALID,
             BINARY,
+            RAW_BYTES,
             NUMERIC,
             BOOLEAN,
             LONG,
@@ -598,6 +610,7 @@ public class BigQueryUtilsTest {
             TIME_0S_0NS,
             VALID,
             BINARY,
+            RAW_BYTES,
             NUMERIC,
             BOOLEAN,
             LONG,
@@ -620,7 +633,7 @@ public class BigQueryUtilsTest {
   public void testToTableRow_flat() {
     TableRow row = toTableRow().apply(FLAT_ROW);
 
-    assertThat(row.size(), equalTo(22));
+    assertThat(row.size(), equalTo(23));
     assertThat(row, hasEntry("id", "123"));
     assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
@@ -635,6 +648,7 @@ public class BigQueryUtilsTest {
     assertThat(row, hasEntry("name", "test"));
     assertThat(row, hasEntry("valid", "false"));
     assertThat(row, hasEntry("binary", "ABCD1234"));
+    assertThat(row, hasEntry("raw_bytes", "ABCD1234"));
     assertThat(row, hasEntry("numeric", "123.456"));
     assertThat(row, hasEntry("boolean", "true"));
     assertThat(row, hasEntry("long", "123"));
@@ -674,7 +688,7 @@ public class BigQueryUtilsTest {
 
     assertThat(row.size(), equalTo(1));
     row = (TableRow) row.get("row");
-    assertThat(row.size(), equalTo(22));
+    assertThat(row.size(), equalTo(23));
     assertThat(row, hasEntry("id", "123"));
     assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
@@ -689,6 +703,7 @@ public class BigQueryUtilsTest {
     assertThat(row, hasEntry("name", "test"));
     assertThat(row, hasEntry("valid", "false"));
     assertThat(row, hasEntry("binary", "ABCD1234"));
+    assertThat(row, hasEntry("raw_bytes", "ABCD1234"));
     assertThat(row, hasEntry("numeric", "123.456"));
     assertThat(row, hasEntry("boolean", "true"));
     assertThat(row, hasEntry("long", "123"));
@@ -701,7 +716,7 @@ public class BigQueryUtilsTest {
 
     assertThat(row.size(), equalTo(1));
     row = ((List<TableRow>) row.get("rows")).get(0);
-    assertThat(row.size(), equalTo(22));
+    assertThat(row.size(), equalTo(23));
     assertThat(row, hasEntry("id", "123"));
     assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("datetime", "2020-11-02T12:34:56.789876"));
@@ -716,6 +731,7 @@ public class BigQueryUtilsTest {
     assertThat(row, hasEntry("name", "test"));
     assertThat(row, hasEntry("valid", "false"));
     assertThat(row, hasEntry("binary", "ABCD1234"));
+    assertThat(row, hasEntry("raw_bytes", "ABCD1234"));
     assertThat(row, hasEntry("numeric", "123.456"));
     assertThat(row, hasEntry("boolean", "true"));
     assertThat(row, hasEntry("long", "123"));
@@ -726,7 +742,7 @@ public class BigQueryUtilsTest {
   public void testToTableRow_null_row() {
     TableRow row = toTableRow().apply(NULL_FLAT_ROW);
 
-    assertThat(row.size(), equalTo(22));
+    assertThat(row.size(), equalTo(23));
     assertThat(row, hasEntry("id", null));
     assertThat(row, hasEntry("value", null));
     assertThat(row, hasEntry("name", null));
@@ -745,6 +761,7 @@ public class BigQueryUtilsTest {
     assertThat(row, hasEntry("time0s_0ns", null));
     assertThat(row, hasEntry("valid", null));
     assertThat(row, hasEntry("binary", null));
+    assertThat(row, hasEntry("raw_bytes", null));
     assertThat(row, hasEntry("numeric", null));
     assertThat(row, hasEntry("boolean", null));
     assertThat(row, hasEntry("long", null));
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java
new file mode 100644
index 00000000000..012afed6fb4
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FileLoadsStreamingIT.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class FileLoadsStreamingIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileLoadsStreamingIT.class);
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return ImmutableList.of(new Object[] {false}, new Object[] {true});
+  }
+
+  @Parameterized.Parameter(0)
+  public boolean useInputSchema;
+
+  @Rule public TestName testName = new TestName();
+
+  private static final BigqueryClient BQ_CLIENT = new 
BigqueryClient("FileLoadsStreamingIT");
+  private static final String PROJECT =
+      TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+  private static final String BIG_QUERY_DATASET_ID = 
"file_loads_streaming_it_" + System.nanoTime();
+
+  private static final String[] FIELDS = {
+    "BOOL",
+    "BOOLEAN",
+    "BYTES",
+    "INT64",
+    "INTEGER",
+    "FLOAT",
+    "FLOAT64",
+    "NUMERIC",
+    "STRING",
+    "DATE",
+    "TIMESTAMP"
+  };
+
+  private static final int TOTAL_N = 50;
+
+  private final Random randomGenerator = new Random();
+
+  @BeforeClass
+  public static void setUpTestEnvironment() throws IOException, 
InterruptedException {
+    // Create one BQ dataset for all test cases.
+    cleanUp();
+    BQ_CLIENT.createNewDataset(PROJECT, BIG_QUERY_DATASET_ID);
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    BQ_CLIENT.deleteDataset(PROJECT, BIG_QUERY_DATASET_ID);
+  }
+
+  static class GenerateRowFunc implements SerializableFunction<Long, TableRow> 
{
+    private final List<String> fieldNames;
+
+    public GenerateRowFunc(List<String> fieldNames) {
+      this.fieldNames = fieldNames;
+    }
+
+    @Override
+    public TableRow apply(Long rowId) {
+      TableRow row = new TableRow();
+      row.set("id", rowId);
+
+      for (String name : fieldNames) {
+        String type = Iterables.get(Splitter.on('_').split(name), 0);
+        switch (type) {
+          case "BOOL":
+          case "BOOLEAN":
+            if (rowId % 2 == 0) {
+              row.set(name, false);
+            } else {
+              row.set(name, true);
+            }
+            break;
+          case "BYTES":
+            row.set(name, String.format("test_blob_%s", 
rowId).getBytes(StandardCharsets.UTF_8));
+            break;
+          case "INT64":
+          case "INTEGER":
+            row.set(name, String.valueOf(rowId + 10));
+            break;
+          case "FLOAT":
+          case "FLOAT64":
+            row.set(name, String.valueOf(0.5 + rowId));
+            break;
+          case "NUMERIC":
+            row.set(name, String.valueOf(rowId + 0.12345));
+            break;
+          case "DATE":
+            row.set(name, "2022-01-01");
+            break;
+          case "TIMESTAMP":
+            row.set(name, "2022-01-01 10:10:10.012 UTC");
+            break;
+          case "STRING":
+            row.set(name, "test_string" + rowId);
+            break;
+          default:
+            row.set(name, "unknown" + rowId);
+            break;
+        }
+      }
+      return row;
+    }
+  }
+
+  private static TableSchema makeTableSchemaFromTypes(List<String> fieldNames) 
{
+    ImmutableList.Builder<TableFieldSchema> builder = 
ImmutableList.<TableFieldSchema>builder();
+
+    // Add an id field for verification of correctness
+    builder.add(new 
TableFieldSchema().setType("INTEGER").setName("id").setMode("REQUIRED"));
+
+    // the name is prefix with type_.
+    for (String name : fieldNames) {
+      String mode = "REQUIRED";
+      builder.add(new 
TableFieldSchema().setType(name).setName(name).setMode(mode));
+    }
+
+    return new TableSchema().setFields(builder.build());
+  }
+
+  private String maybeCreateTable(TableSchema tableSchema, String suffix)
+      throws IOException, InterruptedException {
+    String tableId = 
Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0);
+
+    BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId + suffix);
+    if (!useInputSchema) {
+      BQ_CLIENT.createNewTable(
+          PROJECT,
+          BIG_QUERY_DATASET_ID,
+          new Table()
+              .setSchema(tableSchema)
+              .setTableReference(
+                  new TableReference()
+                      .setTableId(tableId + suffix)
+                      .setDatasetId(BIG_QUERY_DATASET_ID)
+                      .setProjectId(PROJECT)));
+    } else {
+      tableId += "WithInputSchema";
+    }
+    return String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, tableId + 
suffix);
+  }
+
+  private void runStreaming(int numFileShards, boolean useCopyJobs)
+      throws IOException, InterruptedException {
+    TestPipelineOptions opts = 
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+    opts.setTempLocation(opts.getTempRoot());
+    Pipeline p = Pipeline.create(opts);
+
+    // Only run the most relevant test case on Dataflow.
+    // Testing this dimension on DirectRunner is sufficient
+    if (p.getOptions().getRunner().getName().contains("DataflowRunner")) {
+      assumeTrue("Skipping in favor of more relevant test case", 
useInputSchema);
+      // Need to manually enable streaming engine for legacy dataflow runner
+      ExperimentalOptions.addExperiment(
+          p.getOptions().as(ExperimentalOptions.class), 
GcpOptions.STREAMING_ENGINE_EXPERIMENT);
+    }
+
+    List<String> fieldNamesOrigin = Arrays.asList(FIELDS);
+    // Shuffle the fields in the write schema to do fuzz testing on field order
+    List<String> fieldNamesShuffled = new ArrayList<String>(fieldNamesOrigin);
+    Collections.shuffle(fieldNamesShuffled, randomGenerator);
+
+    TableSchema bqTableSchema = makeTableSchemaFromTypes(fieldNamesOrigin);
+    TableSchema inputSchema = makeTableSchemaFromTypes(fieldNamesShuffled);
+    String tableSpec = maybeCreateTable(bqTableSchema, "");
+
+    // set up and build pipeline
+    Instant start = new Instant(0);
+    GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesShuffled);
+    PCollection<Instant> instants =
+        p.apply(
+            "Generate Instants",
+            PeriodicImpulse.create()
+                .startAt(start)
+                .stopAt(start.plus(Duration.standardSeconds(TOTAL_N - 1)))
+                .withInterval(Duration.standardSeconds(1))
+                .catchUpToNow(false));
+    PCollection<TableRow> rows =
+        instants.apply(
+            "Create TableRows",
+            MapElements.into(TypeDescriptor.of(TableRow.class))
+                .via(instant -> generateRowFunc.apply(instant.getMillis() / 
1000)));
+    // build write transform
+    Write<TableRow> write =
+        BigQueryIO.writeTableRows()
+            .to(tableSpec)
+            .withMethod(Write.Method.FILE_LOADS)
+            .withTriggeringFrequency(Duration.standardSeconds(10));
+    if (useCopyJobs) {
+      write = write.withMaxBytesPerPartition(250);
+    }
+    if (useInputSchema) {
+      // we're creating the table with the input schema
+      write =
+          write
+              .withSchema(inputSchema)
+              .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+              .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
+    } else {
+      // table already exists with a schema, no need to create it
+      write =
+          write
+              .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+              .withWriteDisposition(WriteDisposition.WRITE_APPEND);
+    }
+    write = numFileShards == 0 ? write.withAutoSharding() : 
write.withNumFileShards(numFileShards);
+
+    rows.apply("Stream loads to BigQuery", write);
+    p.run().waitUntilFinish();
+
+    List<TableRow> expectedRows = new ArrayList<>();
+    for (long i = 0; i < TOTAL_N; i++) {
+      expectedRows.add(generateRowFunc.apply(i));
+    }
+
+    // Perform checks
+    checkRowCompleteness(tableSpec, inputSchema, expectedRows);
+  }
+
+  // Check that the expected rows reached the table.
+  private static void checkRowCompleteness(
+      String tableSpec, TableSchema schema, List<TableRow> expectedRows)
+      throws IOException, InterruptedException {
+    List<TableRow> actualTableRows =
+        BQ_CLIENT.queryUnflattened(
+            String.format("SELECT * FROM [%s]", tableSpec), PROJECT, true, 
false);
+
+    Schema rowSchema = BigQueryUtils.fromTableSchema(schema);
+    List<Row> actualBeamRows =
+        actualTableRows.stream()
+            .map(tableRow -> BigQueryUtils.toBeamRow(rowSchema, tableRow))
+            .collect(Collectors.toList());
+    List<Row> expectedBeamRows =
+        expectedRows.stream()
+            .map(tableRow -> BigQueryUtils.toBeamRow(rowSchema, tableRow))
+            .collect(Collectors.toList());
+    LOG.info(
+        "Actual rows number: {}, expected: {}", actualBeamRows.size(), 
expectedBeamRows.size());
+
+    assertThat(
+        "Comparing expected rows with actual rows",
+        actualBeamRows,
+        containsInAnyOrder(expectedBeamRows.toArray()));
+    assertEquals(
+        "Checking there is no duplication", expectedBeamRows.size(), 
actualBeamRows.size());
+  }
+
+  @Test
+  public void testLoadWithFixedShards() throws IOException, 
InterruptedException {
+    runStreaming(5, false);
+  }
+
+  @Test
+  public void testLoadWithAutoShardingAndCopyJobs() throws IOException, 
InterruptedException {
+    runStreaming(0, true);
+  }
+
+  @Test
+  public void testDynamicDestinationsWithFixedShards() throws IOException, 
InterruptedException {
+    runStreamingToDynamicDestinations(6, false);
+  }
+
+  @Test
+  public void testDynamicDestinationsWithAutoShardingAndCopyJobs()
+      throws IOException, InterruptedException {
+    runStreamingToDynamicDestinations(0, true);
+  }
+
+  private void runStreamingToDynamicDestinations(int numFileShards, boolean 
useCopyJobs)
+      throws IOException, InterruptedException {
+    TestPipelineOptions opts = 
TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
+    opts.setTempLocation(opts.getTempRoot());
+    Pipeline p = Pipeline.create(opts);
+    // Only run the most relevant test cases on Dataflow. Testing this 
dimension on DirectRunner is
+    // sufficient
+    if (p.getOptions().getRunner().getName().contains("DataflowRunner")) {
+      assumeTrue("Skipping in favor of more relevant test case", 
useInputSchema);
+      // Need to manually enable streaming engine for legacy dataflow runner
+      ExperimentalOptions.addExperiment(
+          p.getOptions().as(ExperimentalOptions.class), 
GcpOptions.STREAMING_ENGINE_EXPERIMENT);
+    }
+
+    List<String> allFields = Arrays.asList(FIELDS);
+    List<String> subFields0 = new ArrayList<>(allFields.subList(0, 4));
+    List<String> subFields1 = new ArrayList<>(allFields.subList(4, 8));
+    List<String> subFields2 = new ArrayList<>(allFields.subList(8, 11));
+    TableSchema table0Schema = makeTableSchemaFromTypes(subFields0);
+    TableSchema table1Schema = makeTableSchemaFromTypes(subFields1);
+    TableSchema table2Schema = makeTableSchemaFromTypes(subFields2);
+    String table0Id = maybeCreateTable(table0Schema, "-0");
+    String table1Id = maybeCreateTable(table1Schema, "-1");
+    String table2Id = maybeCreateTable(table2Schema, "-2");
+    GenerateRowFunc generateRowFunc0 = new GenerateRowFunc(subFields0);
+    GenerateRowFunc generateRowFunc1 = new GenerateRowFunc(subFields1);
+    GenerateRowFunc generateRowFunc2 = new GenerateRowFunc(subFields2);
+
+    String tablePrefix = table0Id.substring(0, table0Id.length() - 2);
+
+    // set up and build pipeline
+    Instant start = new Instant(0);
+    PCollection<Instant> instants =
+        p.apply(
+            "Generate Instants",
+            PeriodicImpulse.create()
+                .startAt(start)
+                .stopAt(start.plus(Duration.standardSeconds(TOTAL_N - 1)))
+                .withInterval(Duration.standardSeconds(1))
+                .catchUpToNow(false));
+    PCollection<Long> longs =
+        instants.apply(
+            "Create TableRows",
+            MapElements.into(TypeDescriptors.longs()).via(instant -> 
instant.getMillis() / 1000));
+    // build write transform
+    Write<Long> write =
+        BigQueryIO.<Long>write()
+            .to(
+                new TestDynamicDest(
+                    tablePrefix, subFields0, subFields1, subFields2, 
useInputSchema))
+            .withFormatFunction(
+                id -> {
+                  long dest = id % 3;
+                  TableRow row;
+                  if (dest == 0) {
+                    row = generateRowFunc0.apply(id);
+                  } else if (dest == 1) {
+                    row = generateRowFunc1.apply(id);
+                  } else {
+                    row = generateRowFunc2.apply(id);
+                  }
+                  return row;
+                })
+            .withMethod(Write.Method.FILE_LOADS)
+            .withTriggeringFrequency(Duration.standardSeconds(10));
+    if (useCopyJobs) {
+      write = write.withMaxBytesPerPartition(150);
+    }
+    if (useInputSchema) {
+      // we're creating the table with the input schema
+      write =
+          write
+              .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+              .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
+    } else {
+      // table already exists with a schema, no need to create it
+      write =
+          write
+              .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+              .withWriteDisposition(WriteDisposition.WRITE_APPEND);
+    }
+    write = numFileShards == 0 ? write.withAutoSharding() : 
write.withNumFileShards(numFileShards);
+
+    longs.apply("Stream loads to dynamic destinations", write);
+    p.run().waitUntilFinish();
+
+    List<TableRow> expectedRows0 = new ArrayList<>();
+    List<TableRow> expectedRows1 = new ArrayList<>();
+    List<TableRow> expectedRows2 = new ArrayList<>();
+    for (long i = 0; i < TOTAL_N; i++) {
+      long dest = i % 3;
+      if (dest == 0) {
+        expectedRows0.add(generateRowFunc0.apply(i));
+      } else if (dest == 1) {
+        expectedRows1.add(generateRowFunc1.apply(i));
+      } else {
+        expectedRows2.add(generateRowFunc2.apply(i));
+      }
+    }
+    // Perform checks
+    checkRowCompleteness(table0Id, makeTableSchemaFromTypes(subFields0), 
expectedRows0);
+    checkRowCompleteness(table1Id, makeTableSchemaFromTypes(subFields1), 
expectedRows1);
+    checkRowCompleteness(table2Id, makeTableSchemaFromTypes(subFields2), 
expectedRows2);
+  }
+
+  static class TestDynamicDest extends DynamicDestinations<Long, Long> {
+    String tablePrefix;
+    List<String> table0Fields;
+    List<String> table1Fields;
+    List<String> table2Fields;
+    boolean useInputSchema;
+
+    public TestDynamicDest(
+        String tablePrefix,
+        List<String> table0Fields,
+        List<String> table1Fields,
+        List<String> table2Fields,
+        boolean useInputSchema) {
+      this.tablePrefix = tablePrefix;
+      this.table0Fields = table0Fields;
+      this.table1Fields = table1Fields;
+      this.table2Fields = table2Fields;
+      this.useInputSchema = useInputSchema;
+    }
+
+    @Override
+    public Long getDestination(@Nullable ValueInSingleWindow<Long> element) {
+      return element.getValue() % 3;
+    }
+
+    @Override
+    public TableDestination getTable(Long destination) {
+      return new TableDestination(tablePrefix + "-" + destination, null);
+    }
+
+    @Override
+    public @Nullable TableSchema getSchema(Long destination) {
+      if (!useInputSchema) {
+        return null;
+      }
+      List<String> fields;
+      if (destination == 0) {
+        fields = table0Fields;
+      } else if (destination == 1) {
+        fields = table1Fields;
+      } else {
+        fields = table2Fields;
+      }
+      List<TableFieldSchema> tableFields =
+          fields.stream()
+              .map(name -> new 
TableFieldSchema().setName(name).setType(name).setMode("REQUIRED"))
+              .collect(Collectors.toList());
+      // we attach an ID to each row in addition to the existing schema fields
+      tableFields.add(
+          0, new 
TableFieldSchema().setName("id").setType("INTEGER").setMode("REQUIRED"));
+      return new TableSchema().setFields(tableFields);
+    }
+  }
+}

Reply via email to