reuvenlax commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3126012380


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -2513,6 +2515,180 @@ public void updateTableSchemaTest(boolean useSet) 
throws Exception {
                 Iterables.concat(expectedDroppedValues, expectedFullValues), 
TableRow.class)));
   }
 
+  @Test
+  public void testAutoPatchTableSchemaTest() throws Exception {
+    assumeTrue(useStreaming);
+    assumeTrue(useStorageApi);
+
+    // Make sure that GroupIntoBatches does not buffer data.
+    
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(1);
+    p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(1);
+    
p.getOptions().as(BigQueryOptions.class).setSchemaUpgradeBufferingShards(2);
+
+    BigQueryIO.Write.Method method =
+        useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE : 
Method.STORAGE_WRITE_API;
+    p.enableAbandonedNodeEnforcement(false);
+
+    TableReference tableRef = 
BigQueryHelpers.parseTableSpec("project-id:dataset-id.table");
+    TableSchema tableSchema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema().setName("number").setType("INT64"),
+                    new TableFieldSchema().setName("name").setType("STRING"),
+                    new 
TableFieldSchema().setName("req").setType("STRING").setMode("REQUIRED")));
+    fakeDatasetService.createTable(new 
Table().setTableReference(tableRef).setSchema(tableSchema));
+
+    final int stride = 5;
+    Function<Integer, TableSchema> getUpdatedSchema =
+        currentStride -> {
+          TableSchema tableSchemaUpdated = new TableSchema();
+          tableSchemaUpdated.setFields(
+              Lists.newArrayList(
+                  new 
TableFieldSchema().setName("number").setType("INT64").setMode("NULLABLE"),
+                  new 
TableFieldSchema().setName("name").setType("STRING").setMode("NULLABLE"),
+                  new 
TableFieldSchema().setName("req").setType("STRING").setMode("NULLABLE"),
+                  new 
TableFieldSchema().setName("new1").setType("STRING").setMode("NULLABLE"),
+                  new 
TableFieldSchema().setName("new2").setType("STRING").setMode("NULLABLE")));
+
+          if (currentStride >= 2) {
+            List<TableFieldSchema> nestedFields =
+                Lists.newArrayList(
+                    new TableFieldSchema()
+                        .setName("nested_field1")
+                        .setType("STRING")
+                        .setMode("NULLABLE"),
+                    new TableFieldSchema()
+                        .setName("nested_field2")
+                        .setType("STRING")
+                        .setMode("NULLABLE"));
+            if (currentStride >= 3) {
+              List<TableFieldSchema> doubleNestedFields =
+                  Lists.newArrayList(
+                      new TableFieldSchema()
+                          .setName("double_nested_field1")
+                          .setType("STRING")
+                          .setMode("NULLABLE"));
+              nestedFields.add(
+                  new TableFieldSchema()
+                      .setName("double_nested")
+                      .setType("STRUCT")
+                      .setMode("NULLABLE")
+                      .setFields(doubleNestedFields));
+
+              List<TableFieldSchema> repeatedNestedFields =
+                  Lists.newArrayList(
+                      new TableFieldSchema()
+                          .setName("repeated_nested_field1")
+                          .setType("STRING")
+                          .setMode("NULLABLE"),
+                      new TableFieldSchema()
+                          .setName("repeated_nested_field2")
+                          .setType("STRING")
+                          .setMode("NULLABLE"));
+
+              nestedFields.add(
+                  new TableFieldSchema()
+                      .setName("repeated_nested")
+                      .setType("STRUCT")
+                      .setMode("REPEATED")
+                      .setFields(repeatedNestedFields));
+            }
+            tableSchemaUpdated
+                .getFields()
+                .add(
+                    new TableFieldSchema()
+                        .setName("nested")
+                        .setType("STRUCT")
+                        .setMode("NULLABLE")
+                        .setFields(nestedFields));
+          }
+          return tableSchemaUpdated;
+        };
+
+    IntFunction<TableRow> getRow =
+        (IntFunction<TableRow> & Serializable)
+            (int i) -> {
+              TableRow row = new TableRow().set("name", "name" + 
i).set("number", Long.toString(i));
+              if (i < stride) {
+                row = row.set("req", "foo");
+              } else {
+                row = row.set("new1", "blah" + i);
+                row = row.set("new2", "baz" + i);
+
+                if (i >= 2 * stride) {
+                  TableRow nested =
+                      new TableRow()
+                          .set("nested_field1", "nested1" + i)
+                          .set("nested_field2", "nested2" + i);
+
+                  if (i >= 3 * stride) {
+                    TableRow doubleNested =
+                        new TableRow().set("double_nested_field1", 
"double_nested1" + i);
+                    nested = nested.set("double_nested", doubleNested);
+
+                    // Add a repeated struct to ensure that we capture this 
code path as well.
+                    TableRow repeatedNested1 =
+                        new TableRow().set("repeated_nested_field1", 
"repeated_nested1" + i);
+                    TableRow repeatedNested2 =
+                        new TableRow().set("repeated_nested_field2", 
"repeated_nested2" + i);
+                    nested =
+                        nested.set(
+                            "repeated_nested", 
ImmutableList.of(repeatedNested1, repeatedNested2));
+                  }
+                  row.set("nested", nested);
+                }
+              }
+              return row;
+            };
+
+    TestStream.Builder<TableRow> testStream =
+        TestStream.create(TableRowJsonCoder.of()).advanceWatermarkTo(new 
Instant(0));
+    List<TableRow> expectedRows = Lists.newArrayList();
+    for (int i = 0; i < 20; i += stride) {
+      for (int j = i; j < i + stride; ++j) {
+        TableRow tableRow = getRow.apply(j);
+        expectedRows.add(tableRow);
+        testStream = testStream.addElements(tableRow);
+      }
+      if (i > 0 && (i % 5) == 0) {
+        for (int n = 0; n < 5; ++n) {
+          testStream = 
testStream.advanceProcessingTime(Duration.standardSeconds(2));
+        }
+      }
+    }
+    for (int i = 0; i < 5; ++i) {
+      testStream = 
testStream.advanceProcessingTime(Duration.standardSeconds(2));
+    }

Review Comment:
   We're trying to force some processing-time timer expiration so we aren't 
just relying on OnWindowExpiration. It's not great, because TestStream isn't 
well suited to testing processing-time timers. Ideally we'd be able to advance 
the clock _after_ the job has started, and then wait for a signal that the 
timer has expired. TestStream wants these advancements to be deterministically 
put in the input stream though. This approach does add some test coverage, but 
it's not very deterministic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to