This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 540ce4e [BEAM-2870] Strips partition decorators when creating/patching tables in batch new ead5d43 This closes #4177: [BEAM-2870] Strips partition decorators when creating/patching tables in batch 540ce4e is described below commit 540ce4ee87973cf82e2246ffd5d686055203069e Author: Eugene Kirpichov <kirpic...@google.com> AuthorDate: Mon Nov 27 10:28:02 2017 -0800 [BEAM-2870] Strips partition decorators when creating/patching tables in batch --- .../apache/beam/sdk/io/gcp/bigquery/WriteTables.java | 4 +++- .../beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 12 +++++++++--- .../beam/sdk/io/gcp/bigquery/FakeDatasetService.java | 19 ++++++++++++++----- .../beam/sdk/io/gcp/bigquery/FakeJobService.java | 9 ++++++++- 4 files changed, 34 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java index 9ca253c..7077651 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java @@ -264,7 +264,9 @@ class WriteTables<DestinationT> switch (jobStatus) { case SUCCEEDED: if (tableDescription != null) { - datasetService.patchTableDescription(ref, tableDescription); + datasetService.patchTableDescription( + ref.clone().setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), + tableDescription); } return; case UNKNOWN: diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 08f7464..31c1781 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -589,7 +589,12 @@ public class BigQueryIOTest implements Serializable { if (streaming) { users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + } + + // Use a partition decorator to verify that partition decorators are supported. + final String partitionDecorator = "20171127"; + users.apply( "WriteBigQuery", BigQueryIO.<String>write() @@ -630,7 +635,8 @@ public class BigQueryIOTest implements Serializable { verifySideInputs(); // Each user in it's own table. return new TableDestination( - "dataset-id.userid-" + userId, "table for userid " + userId); + "dataset-id.userid-" + userId + "$" + partitionDecorator, + "table for userid " + userId); } @Override @@ -2528,7 +2534,7 @@ public class BigQueryIOTest implements Serializable { p.apply(Create.of(row1, row2)) .apply( BigQueryIO.writeTableRows() - .to("project-id:dataset-id.table-id$decorator") + .to("project-id:dataset-id.table-id$20171127") .withTestServices(fakeBqServices) .withMethod(Method.STREAMING_INSERTS) .withSchema(schema) @@ -2539,7 +2545,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testTableDecoratorStripping() { assertEquals("project:dataset.table", - BigQueryHelpers.stripPartitionDecorator("project:dataset.table$decorator")); + BigQueryHelpers.stripPartitionDecorator("project:dataset.table$20171127")); assertEquals("project:dataset.table", BigQueryHelpers.stripPartitionDecorator("project:dataset.table")); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java index ffab123..9609ada 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java @@ -96,6 +96,7 @@ class FakeDatasetService implements DatasetService, Serializable { @Override public void deleteTable(TableReference tableRef) throws IOException, InterruptedException { + validateWholeTableReference(tableRef); synchronized (BigQueryIOTest.tables) { Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(tableRef.getProjectId(), tableRef.getDatasetId()); @@ -109,12 +110,13 @@ class FakeDatasetService implements DatasetService, Serializable { } } - - @Override - public void createTable(Table table) throws IOException { + /** + * Validates a table reference for whole-table operations, such as create/delete/patch. Such + * operations do not support partition decorators. + */ + private static void validateWholeTableReference(TableReference tableReference) + throws IOException { final Pattern tableRegexp = Pattern.compile("[-\\w]{1,1024}"); - - TableReference tableReference = table.getTableReference(); if (!tableRegexp.matcher(tableReference.getTableId()).matches()) { throw new IOException( String.format( @@ -123,6 +125,12 @@ class FakeDatasetService implements DatasetService, Serializable { + " decorators cannot be used.", tableReference.getTableId())); } + } + + @Override + public void createTable(Table table) throws IOException { + TableReference tableReference = table.getTableReference(); + validateWholeTableReference(tableReference); synchronized (BigQueryIOTest.tables) { Map<String, TableContainer> dataset = BigQueryIOTest.tables.get(tableReference.getProjectId(), tableReference.getDatasetId()); @@ -245,6 +253,7 @@ class FakeDatasetService implements DatasetService, Serializable { public Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription) throws IOException, InterruptedException { + validateWholeTableReference(tableReference); synchronized (BigQueryIOTest.tables) { TableContainer tableContainer = getTableContainer(tableReference.getProjectId(), tableReference.getDatasetId(), tableReference.getTableId()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java index edf2a55..fcf464f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java @@ -312,7 +312,14 @@ class FakeJobService implements JobService, Serializable { return new JobStatus().setState("FAILED").setErrorResult(new ErrorProto()); } if (existingTable == null) { - existingTable = new Table().setTableReference(destination).setSchema(schema); + TableReference strippedDestination = + destination + .clone() + .setTableId(BigQueryHelpers.stripPartitionDecorator(destination.getTableId())); + existingTable = + new Table() + .setTableReference(strippedDestination) + .setSchema(schema); if (load.getTimePartitioning() != null) { existingTable = existingTable.setTimePartitioning(load.getTimePartitioning()); } -- To stop receiving notification emails like this one, please contact ['"commits@beam.apache.org" <commits@beam.apache.org>'].