reuvenlax commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3126012380
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -2513,6 +2515,180 @@ public void updateTableSchemaTest(boolean useSet)
throws Exception {
Iterables.concat(expectedDroppedValues, expectedFullValues),
TableRow.class)));
}
+ @Test
+ public void testAutoPatchTableSchemaTest() throws Exception {
+ assumeTrue(useStreaming);
+ assumeTrue(useStorageApi);
+
+ // Make sure that GroupIntoBatches does not buffer data.
+
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(1);
+ p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(1);
+
p.getOptions().as(BigQueryOptions.class).setSchemaUpgradeBufferingShards(2);
+
+ BigQueryIO.Write.Method method =
+ useStorageApiApproximate ? Method.STORAGE_API_AT_LEAST_ONCE :
Method.STORAGE_WRITE_API;
+ p.enableAbandonedNodeEnforcement(false);
+
+ TableReference tableRef =
BigQueryHelpers.parseTableSpec("project-id:dataset-id.table");
+ TableSchema tableSchema =
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("number").setType("INT64"),
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new
TableFieldSchema().setName("req").setType("STRING").setMode("REQUIRED")));
+ fakeDatasetService.createTable(new
Table().setTableReference(tableRef).setSchema(tableSchema));
+
+ final int stride = 5;
+ Function<Integer, TableSchema> getUpdatedSchema =
+ currentStride -> {
+ TableSchema tableSchemaUpdated = new TableSchema();
+ tableSchemaUpdated.setFields(
+ Lists.newArrayList(
+ new
TableFieldSchema().setName("number").setType("INT64").setMode("NULLABLE"),
+ new
TableFieldSchema().setName("name").setType("STRING").setMode("NULLABLE"),
+ new
TableFieldSchema().setName("req").setType("STRING").setMode("NULLABLE"),
+ new
TableFieldSchema().setName("new1").setType("STRING").setMode("NULLABLE"),
+ new
TableFieldSchema().setName("new2").setType("STRING").setMode("NULLABLE")));
+
+ if (currentStride >= 2) {
+ List<TableFieldSchema> nestedFields =
+ Lists.newArrayList(
+ new TableFieldSchema()
+ .setName("nested_field1")
+ .setType("STRING")
+ .setMode("NULLABLE"),
+ new TableFieldSchema()
+ .setName("nested_field2")
+ .setType("STRING")
+ .setMode("NULLABLE"));
+ if (currentStride >= 3) {
+ List<TableFieldSchema> doubleNestedFields =
+ Lists.newArrayList(
+ new TableFieldSchema()
+ .setName("double_nested_field1")
+ .setType("STRING")
+ .setMode("NULLABLE"));
+ nestedFields.add(
+ new TableFieldSchema()
+ .setName("double_nested")
+ .setType("STRUCT")
+ .setMode("NULLABLE")
+ .setFields(doubleNestedFields));
+
+ List<TableFieldSchema> repeatedNestedFields =
+ Lists.newArrayList(
+ new TableFieldSchema()
+ .setName("repeated_nested_field1")
+ .setType("STRING")
+ .setMode("NULLABLE"),
+ new TableFieldSchema()
+ .setName("repeated_nested_field2")
+ .setType("STRING")
+ .setMode("NULLABLE"));
+
+ nestedFields.add(
+ new TableFieldSchema()
+ .setName("repeated_nested")
+ .setType("STRUCT")
+ .setMode("REPEATED")
+ .setFields(repeatedNestedFields));
+ }
+ tableSchemaUpdated
+ .getFields()
+ .add(
+ new TableFieldSchema()
+ .setName("nested")
+ .setType("STRUCT")
+ .setMode("NULLABLE")
+ .setFields(nestedFields));
+ }
+ return tableSchemaUpdated;
+ };
+
+ IntFunction<TableRow> getRow =
+ (IntFunction<TableRow> & Serializable)
+ (int i) -> {
+ TableRow row = new TableRow().set("name", "name" +
i).set("number", Long.toString(i));
+ if (i < stride) {
+ row = row.set("req", "foo");
+ } else {
+ row = row.set("new1", "blah" + i);
+ row = row.set("new2", "baz" + i);
+
+ if (i >= 2 * stride) {
+ TableRow nested =
+ new TableRow()
+ .set("nested_field1", "nested1" + i)
+ .set("nested_field2", "nested2" + i);
+
+ if (i >= 3 * stride) {
+ TableRow doubleNested =
+ new TableRow().set("double_nested_field1",
"double_nested1" + i);
+ nested = nested.set("double_nested", doubleNested);
+
+ // Add a repeated struct to ensure that we capture this
code path as well.
+ TableRow repeatedNested1 =
+ new TableRow().set("repeated_nested_field1",
"repeated_nested1" + i);
+ TableRow repeatedNested2 =
+ new TableRow().set("repeated_nested_field2",
"repeated_nested2" + i);
+ nested =
+ nested.set(
+ "repeated_nested",
ImmutableList.of(repeatedNested1, repeatedNested2));
+ }
+ row.set("nested", nested);
+ }
+ }
+ return row;
+ };
+
+ TestStream.Builder<TableRow> testStream =
+ TestStream.create(TableRowJsonCoder.of()).advanceWatermarkTo(new
Instant(0));
+ List<TableRow> expectedRows = Lists.newArrayList();
+ for (int i = 0; i < 20; i += stride) {
+ for (int j = i; j < i + stride; ++j) {
+ TableRow tableRow = getRow.apply(j);
+ expectedRows.add(tableRow);
+ testStream = testStream.addElements(tableRow);
+ }
+ if (i > 0 && (i % 5) == 0) {
+ for (int n = 0; n < 5; ++n) {
+ testStream =
testStream.advanceProcessingTime(Duration.standardSeconds(2));
+ }
+ }
+ }
+ for (int i = 0; i < 5; ++i) {
+ testStream =
testStream.advanceProcessingTime(Duration.standardSeconds(2));
+ }
Review Comment:
We're trying to force some processing-time timer expiration so we aren't
just relying on OnWindowExpiration. It's not great, because TestStream isn't
well suited to testing processing-time timers. Ideally we'd be able to advance
the clock _after_ the job has started, and then wait for a signal that the
timer has expired. TestStream wants these advancements to be deterministically
put in the input stream though. This approach does add some test coverage, but
it's not very deterministic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]